From 3a1f4eac4fd44e1196f56afd9b72b1c0c6a8a36b Mon Sep 17 00:00:00 2001 From: Jonathan Karlsen Date: Tue, 3 Dec 2024 15:21:27 +0100 Subject: [PATCH] Have snapshots overwrite fm status if next fm starts This commit makes snapshots overwrite non-finalized states for forwardmodels if the next forwardmodel has started. This will avoid forwardmodels being stuck in running/pending if an event is lost. This is a safe assumption as no forwardmodel will start running before the previous one has finished successfully. --- .../forward_model_runner/reporting/event.py | 2 +- src/ert/ensemble_evaluator/snapshot.py | 45 +++++++++++++++- .../test_ensemble_evaluator.py | 2 +- .../ensemble_evaluator/test_snapshot.py | 53 +++++++++++++++++++ tests/ert/unit_tests/test_tracking.py | 2 +- 5 files changed, 99 insertions(+), 5 deletions(-) diff --git a/src/_ert/forward_model_runner/reporting/event.py b/src/_ert/forward_model_runner/reporting/event.py index f4f140232e1..3d55665c024 100644 --- a/src/_ert/forward_model_runner/reporting/event.py +++ b/src/_ert/forward_model_runner/reporting/event.py @@ -67,7 +67,6 @@ def __init__(self, evaluator_url, token=None, cert_path=None): self._cert = f.read() else: self._cert = None - self._statemachine = StateMachine() self._statemachine.add_handler((Init,), self._init_handler) self._statemachine.add_handler((Start, Running, Exited), self._job_handler) @@ -163,6 +162,7 @@ def _job_handler(self, msg: Union[Start, Running, Exited]): if msg.success(): logger.debug(f"Job {job_name} exited successfully") self._dump_event(ForwardModelStepSuccess(**job_msg)) + else: logger.error( _JOB_EXIT_FAILED_STRING.format( diff --git a/src/ert/ensemble_evaluator/snapshot.py b/src/ert/ensemble_evaluator/snapshot.py index f2027f4e964..6ca43e4cb0d 100644 --- a/src/ert/ensemble_evaluator/snapshot.py +++ b/src/ert/ensemble_evaluator/snapshot.py @@ -276,7 +276,6 @@ def update_from_event( ) -> "EnsembleSnapshot": e_type = type(event) timestamp = event.time - if source_snapshot is None: source_snapshot = EnsembleSnapshot() if e_type in get_args(RealizationEvent): @@ -364,7 +363,9 @@ def update_from_event( event.fm_step, fm, ) - + self._make_sure_previous_fm_is_not_running( + event.real, event.fm_step, source_snapshot + ) elif e_type in get_args(EnsembleEvent): event = cast(EnsembleEvent, event) self._ensemble_state = _ENSEMBLE_TYPE_EVENT_TO_STATUS[type(event)] @@ -385,6 +386,46 @@ def update_fm_step( self._fm_step_snapshots[real_id, fm_step_id].update(fm_step) return self + def _make_sure_previous_fm_is_not_running( + self, real_id: str, fm_step_id: str, source_snapshot: "EnsembleSnapshot" + ) -> None: + if fm_step_id == "0": + return + previous_fm_step_id = str(int(fm_step_id) - 1) + previous_fm_step = source_snapshot._fm_step_snapshots[ + real_id, previous_fm_step_id + ] + current_fm_start_time_from_source_snapshot = source_snapshot._fm_step_snapshots[ + real_id, fm_step_id + ].get("start_time") + current_fm_start_time_from_update_snapshot = self._fm_step_snapshots[ + real_id, fm_step_id + ].get("start_time") + if "status" in previous_fm_step and previous_fm_step["status"] not in [ + "Failed", + "Finished", + ]: + logger.error( + ( + f"Did not get finished event for {real_id=} {previous_fm_step_id=}, " + "but next fm was started so we assume it finished successfully and carry on." + ) + ) + self._fm_step_snapshots[real_id, previous_fm_step_id]["status"] = "Finished" + self._fm_step_snapshots[real_id, previous_fm_step_id]["end_time"] = ( + current_fm_start_time_from_source_snapshot + or current_fm_start_time_from_update_snapshot + ) + previous_fm_step["status"] = "Finished" + previous_fm_step["end_time"] = ( + current_fm_start_time_from_source_snapshot + or current_fm_start_time_from_update_snapshot + ) + # might affect many of the previous FMs + self._make_sure_previous_fm_is_not_running( + real_id, previous_fm_step_id, source_snapshot + ) + class FMStepSnapshot(TypedDict, total=False): status: Optional[str] diff --git a/tests/ert/unit_tests/ensemble_evaluator/test_ensemble_evaluator.py b/tests/ert/unit_tests/ensemble_evaluator/test_ensemble_evaluator.py index fdae28e50a0..eebe768a9ce 100644 --- a/tests/ert/unit_tests/ensemble_evaluator/test_ensemble_evaluator.py +++ b/tests/ert/unit_tests/ensemble_evaluator/test_ensemble_evaluator.py @@ -298,7 +298,7 @@ def check_if_all_fm_running(snapshot: EnsembleSnapshot) -> bool: ) assert ( snapshot.get_fm_step("1", "0")["status"] - == FORWARD_MODEL_STATE_RUNNING + == FORWARD_MODEL_STATE_FINISHED ) assert ( snapshot.get_fm_step("1", "1")["status"] diff --git a/tests/ert/unit_tests/ensemble_evaluator/test_snapshot.py b/tests/ert/unit_tests/ensemble_evaluator/test_snapshot.py index f0492b1bb92..035bee277db 100644 --- a/tests/ert/unit_tests/ensemble_evaluator/test_snapshot.py +++ b/tests/ert/unit_tests/ensemble_evaluator/test_snapshot.py @@ -1,8 +1,10 @@ +import logging from datetime import datetime from _ert.events import ( ForwardModelStepFailure, ForwardModelStepRunning, + ForwardModelStepStart, ForwardModelStepSuccess, RealizationSuccess, ) @@ -103,3 +105,54 @@ def test_that_realization_success_message_updates_state(): assert ( snapshot.to_dict()["reals"]["0"]["status"] == state.REALIZATION_STATE_FINISHED ) + + +def test_fm_updates_previous_fm_if_it_is_stuck_in_nonfinalized_state(caplog): + """In case a snapshot event is lost (if connection is dropped), a forwardmodel can be stuck in a non-finalized state. + This should be handled, and it is safe to assume it has already finished if the next forwardmodel has started. + We also don't know the end_time of the prior forwardmodel if we have lost a finished/exited event, + so we set it to the start_time of the next forwardmodel.""" + caplog.set_level(logging.ERROR) + + main_snapshot = EnsembleSnapshot() + main_snapshot._fm_step_snapshots["0", "0"] = FMStepSnapshot( + status="Pending", + index="0", + start_time=datetime.fromtimestamp(757575), + name="forward_model0", + ) + main_snapshot._fm_step_snapshots["0", "1"] = FMStepSnapshot( + status="Running", + index="1", + start_time=datetime.fromtimestamp(939393), + name="forward_model1", + ) + + update_snapshot = EnsembleSnapshot() + update_snapshot.update_from_event( + ForwardModelStepStart( + real="0", fm_step="2", time=datetime.fromtimestamp(10101010) + ), + main_snapshot, + ) + update_snapshot.update_from_event( + ForwardModelStepRunning(real="0", fm_step="2"), main_snapshot + ) + main_snapshot.merge_snapshot(update_snapshot) + + affected_snapshots = [main_snapshot, update_snapshot] + for snapshot in affected_snapshots: + assert snapshot.get_fm_step(real_id="0", fm_step_id="0")["status"] == "Finished" + assert snapshot.get_fm_step(real_id="0", fm_step_id="0")[ + "end_time" + ] == datetime.fromtimestamp(939393) + + assert snapshot.get_fm_step(real_id="0", fm_step_id="1")["status"] == "Finished" + assert snapshot.get_fm_step(real_id="0", fm_step_id="1")[ + "end_time" + ] == datetime.fromtimestamp(10101010) + + assert snapshot.get_fm_step(real_id="0", fm_step_id="2")["status"] == "Running" + assert "end_time" not in snapshot.get_fm_step(real_id="0", fm_step_id="2") + + assert "Did not get finished event for" in caplog.text diff --git a/tests/ert/unit_tests/test_tracking.py b/tests/ert/unit_tests/test_tracking.py index cf031d8f1f6..1076989bb33 100644 --- a/tests/ert/unit_tests/test_tracking.py +++ b/tests/ert/unit_tests/test_tracking.py @@ -118,7 +118,7 @@ def check_expression(original, path_expression, expected, msg_start): ), pytest.param( "", - ' import os\n if os.getcwd().split("/")[-2].split("-")[1] == "0": sys.exit(1)', + ' import os, sys\n if os.getcwd().split("/")[-2].split("-")[1] == "0": sys.exit(1)', [ ENSEMBLE_SMOOTHER_MODE, "--realizations",