diff --git a/src/ert/scheduler/job.py b/src/ert/scheduler/job.py index 7dba331d16d..d1d63912eb2 100644 --- a/src/ert/scheduler/job.py +++ b/src/ert/scheduler/job.py @@ -63,6 +63,7 @@ def __init__(self, scheduler: Scheduler, real: Realization) -> None: self.aborted = asyncio.Event() self._scheduler: Scheduler = scheduler self._callback_status_msg: str = "" + self._requested_max_submit: Optional[int] = None @property def iens(self) -> int: @@ -103,9 +104,11 @@ async def _submit_and_run_once(self, sem: asyncio.BoundedSemaphore) -> None: if callback_status == LoadStatus.LOAD_SUCCESSFUL: await self._send(State.COMPLETED) - elif callback_status == LoadStatus.TIME_MAP_FAILURE: - await self._send(State.FAILED) - else: # LoadStatus.LOAD_FAILURE + else: + assert callback_status in ( + LoadStatus.LOAD_FAILURE, + LoadStatus.TIME_MAP_FAILURE, + ) await self._send(State.FAILED) else: @@ -126,6 +129,7 @@ async def _submit_and_run_once(self, sem: asyncio.BoundedSemaphore) -> None: async def __call__( self, start: asyncio.Event, sem: asyncio.BoundedSemaphore, max_submit: int = 2 ) -> None: + self._requested_max_submit = max_submit await start.wait() for attempt in range(max_submit): @@ -155,12 +159,22 @@ async def _max_runtime_task(self) -> None: self.returncode.cancel() # Triggers CancelledError + async def _handle_failure(self) -> None: + self.real.run_arg.ensemble_storage.state_map[ + self.iens + ] = RealizationStorageState.LOAD_FAILURE + assert self._requested_max_submit is not None + logger.error( + f"Realization: {self.real.run_arg.iens} " + f"failed after reaching max submit ({self._requested_max_submit}):" + f"\n\t{self._callback_status_msg}" + ) + async def _send(self, state: State) -> None: self.state = state if state in (State.FAILED, State.ABORTED): - self.real.run_arg.ensemble_storage.state_map[ - self.iens - ] = RealizationStorageState.LOAD_FAILURE + await self._handle_failure() + status = STATE_TO_LEGACY[state] event = CloudEvent( { diff --git a/tests/integration_tests/status/test_tracking_integration.py b/tests/integration_tests/status/test_tracking_integration.py index 04c2feea713..131c8e7bf31 100644 --- a/tests/integration_tests/status/test_tracking_integration.py +++ b/tests/integration_tests/status/test_tracking_integration.py @@ -377,7 +377,7 @@ def run_sim(start_date): summary.fwrite() -@pytest.mark.scheduler(skip=True) +@pytest.mark.scheduler() @pytest.mark.integration_test def test_tracking_missing_ecl( tmpdir, source_root, caplog, storage, try_queue_and_scheduler, monkeypatch