diff --git a/src/ert/ensemble_evaluator/_ensemble.py b/src/ert/ensemble_evaluator/_ensemble.py index 32c855f2afd..7f1b8223e6c 100644 --- a/src/ert/ensemble_evaluator/_ensemble.py +++ b/src/ert/ensemble_evaluator/_ensemble.py @@ -17,7 +17,15 @@ Union, ) -from _ert.events import Event, Id, event_from_dict, event_to_json +from _ert.events import ( + Event, + FMEvent, + ForwardModelStepFailure, + ForwardModelStepSuccess, + Id, + event_from_dict, + event_to_json, +) from _ert.forward_model_runner.client import Client from ert.config import ForwardModelStep, QueueConfig from ert.run_arg import RunArg @@ -142,9 +150,42 @@ def _create_snapshot(self) -> EnsembleSnapshot: def get_successful_realizations(self) -> List[int]: return self.snapshot.get_successful_realizations() + def _log_completed_fm_step( + self, event: FMEvent, step_snapshot: Optional[FMStepSnapshot] + ) -> None: + if step_snapshot is None: + logger.warning(f"Should log {event}, but there was no step_snapshot") + return + step_name = step_snapshot.get("name", "") + start_time = step_snapshot.get("start_time") + cpu_seconds = step_snapshot.get("cpu_seconds") + current_memory_usage = step_snapshot.get("current_memory_usage") + if start_time is not None and event.time is not None: + walltime = str((event.time - start_time).total_seconds()) + else: + # We get here if the Running event is in the same event batch as + # the Success event. That means that runtime is close to zero. + walltime = "0" + logger.warning( + f"{event.event_type} {step_name} " + f"{walltime=} " + f"{cpu_seconds=} " + f"{current_memory_usage=} " + f"step_index={event.fm_step} " + f"real={event.real} " + f"ensemble={event.ensemble}" + ) + def update_snapshot(self, events: Sequence[Event]) -> EnsembleSnapshot: snapshot_mutate_event = EnsembleSnapshot() for event in events: + if isinstance(event, (ForwardModelStepSuccess, ForwardModelStepFailure)): + step = ( + self.snapshot.reals[event.real] + .get("fm_steps", {}) + .get(event.fm_step) + ) + self._log_completed_fm_step(event, step) snapshot_mutate_event = snapshot_mutate_event.update_from_event( event, source_snapshot=self.snapshot ) diff --git a/src/ert/ensemble_evaluator/snapshot.py b/src/ert/ensemble_evaluator/snapshot.py index bf468128937..f2027f4e964 100644 --- a/src/ert/ensemble_evaluator/snapshot.py +++ b/src/ert/ensemble_evaluator/snapshot.py @@ -335,38 +335,6 @@ def update_from_event( elif e_type in {ForwardModelStepSuccess, ForwardModelStepFailure}: end_time = convert_iso8601_to_datetime(timestamp) - try: - start_time = self._fm_step_snapshots[event.real, event.fm_step].get( - "start_time" - ) - cpu_seconds = self._fm_step_snapshots[ - event.real, event.fm_step - ].get("cpu_seconds") - fm_step_name = source_snapshot.reals[event.real]["fm_steps"][ - event.fm_step - ]["name"] - if start_time is not None: - logger.warning( - f"{event.event_type} {fm_step_name} " - f"walltime={(end_time - start_time).total_seconds()} " - f"cputime={cpu_seconds} " - f"ensemble={event.ensemble} " - f"step_index={event.fm_step} " - f"real={event.real}" - ) - else: - logger.warning( - f"Should log fm_step runtime, but start_time was None, " - f"{event.event_type} {fm_step_name=} " - f"endtime={end_time.isoformat()} " - f"cputime={cpu_seconds} " - f"ensemble={event.ensemble} " - f"step_index={event.fm_step} " - f"real={event.real}" - ) - except BaseException as e: - logger.warning(f"Should log fm_step runtime, but got exception {e}") - if type(event) is ForwardModelStepFailure: error = event.error_msg if event.error_msg else "" else: