Skip to content

Commit

Permalink
Fix test_tracking_missing_ecl
Browse files Browse the repository at this point in the history
  • Loading branch information
berland committed Jan 2, 2024
1 parent abf07da commit 1d90e3c
Show file tree
Hide file tree
Showing 2 changed files with 21 additions and 7 deletions.
26 changes: 20 additions & 6 deletions src/ert/scheduler/job.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ def __init__(self, scheduler: Scheduler, real: Realization) -> None:
self.aborted = asyncio.Event()
self._scheduler: Scheduler = scheduler
self._callback_status_msg: str = ""
self._requested_max_submit: Optional[int] = None

@property
def iens(self) -> int:
Expand Down Expand Up @@ -103,9 +104,11 @@ async def _submit_and_run_once(self, sem: asyncio.BoundedSemaphore) -> None:

if callback_status == LoadStatus.LOAD_SUCCESSFUL:
await self._send(State.COMPLETED)
elif callback_status == LoadStatus.TIME_MAP_FAILURE:
await self._send(State.FAILED)
else: # LoadStatus.LOAD_FAILURE
else:
assert callback_status in (
LoadStatus.LOAD_FAILURE,
LoadStatus.TIME_MAP_FAILURE,
)
await self._send(State.FAILED)

else:
Expand All @@ -126,6 +129,7 @@ async def _submit_and_run_once(self, sem: asyncio.BoundedSemaphore) -> None:
async def __call__(
self, start: asyncio.Event, sem: asyncio.BoundedSemaphore, max_submit: int = 2
) -> None:
self._requested_max_submit = max_submit
await start.wait()

for attempt in range(max_submit):
Expand Down Expand Up @@ -155,12 +159,22 @@ async def _max_runtime_task(self) -> None:

self.returncode.cancel() # Triggers CancelledError

async def _handle_failure(self) -> None:
self.real.run_arg.ensemble_storage.state_map[
self.iens
] = RealizationStorageState.LOAD_FAILURE
assert self._requested_max_submit is not None
logger.error(
f"Realization: {self.real.run_arg.iens} "
f"failed after reaching max submit ({self._requested_max_submit}):"
f"\n\t{self._callback_status_msg}"
)

async def _send(self, state: State) -> None:
self.state = state
if state in (State.FAILED, State.ABORTED):
self.real.run_arg.ensemble_storage.state_map[
self.iens
] = RealizationStorageState.LOAD_FAILURE
await self._handle_failure()

status = STATE_TO_LEGACY[state]
event = CloudEvent(
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -377,7 +377,7 @@ def run_sim(start_date):
summary.fwrite()


@pytest.mark.scheduler(skip=True)
@pytest.mark.scheduler()
@pytest.mark.integration_test
def test_tracking_missing_ecl(
tmpdir, source_root, caplog, storage, try_queue_and_scheduler, monkeypatch
Expand Down

0 comments on commit 1d90e3c

Please sign in to comment.