Skip to content

Commit

Permalink
Let the Scheduler ensemble be stoppable from evaluator.py
Browse files Browse the repository at this point in the history
For some reason, kill_all_jobs() was not able to kill tasks,
seemingly the await self.returncode call was blocking. Solved
by "busy waiting" with asyncio.sleep to let the async code accept
the cancellation.
  • Loading branch information
berland committed Dec 13, 2023
1 parent fd4b902 commit c3d9e34
Show file tree
Hide file tree
Showing 4 changed files with 29 additions and 3 deletions.
2 changes: 2 additions & 0 deletions src/ert/scheduler/job.py
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,8 @@ async def __call__(
await self.started.wait()

await self._send(State.RUNNING)
while not self.returncode.done():
await asyncio.sleep(0.01)
returncode = await self.returncode
if (
returncode == 0
Expand Down
7 changes: 6 additions & 1 deletion src/ert/scheduler/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
from websockets import Headers
from websockets.client import connect

from ert.job_queue.queue import EVTYPE_ENSEMBLE_STOPPED
from ert.job_queue.queue import EVTYPE_ENSEMBLE_CANCELLED, EVTYPE_ENSEMBLE_STOPPED
from ert.scheduler.driver import Driver, JobEvent
from ert.scheduler.job import Job
from ert.scheduler.local_driver import LocalDriver
Expand Down Expand Up @@ -51,6 +51,7 @@ def __init__(self, driver: Optional[Driver] = None) -> None:
self._tasks: MutableMapping[int, asyncio.Task[None]] = {}

self._events: Optional[asyncio.Queue[Any]] = None
self._cancelled = False

self._ee_uri = ""
self._ens_id = ""
Expand All @@ -68,6 +69,7 @@ def add_realization(
self._jobs[real.iens] = Job(self, real)

def kill_all_jobs(self) -> None:
self._cancelled = True
for task in self._tasks.values():
task.cancel()

Expand Down Expand Up @@ -138,6 +140,9 @@ async def execute(
if poller_task:
poller_task.cancel()

if self._cancelled:
return EVTYPE_ENSEMBLE_CANCELLED

return EVTYPE_ENSEMBLE_STOPPED

async def _process_event_queue(self) -> None:
Expand Down
21 changes: 20 additions & 1 deletion tests/unit_tests/ensemble_evaluator/test_ensemble_legacy.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import contextlib
import os
import shutil
from unittest.mock import patch

import pytest
Expand All @@ -9,10 +10,19 @@
from ert.ensemble_evaluator.config import EvaluatorServerConfig
from ert.ensemble_evaluator.evaluator import EnsembleEvaluator
from ert.ensemble_evaluator.monitor import Monitor
from ert.shared.feature_toggling import FeatureToggling


@pytest.mark.timeout(60)
def test_run_legacy_ensemble(tmpdir, make_ensemble_builder, monkeypatch):
_test_run_legacy_ensemble(tmpdir, make_ensemble_builder, monkeypatch)
monkeypatch.setattr(FeatureToggling._conf["scheduler"], "is_enabled", True)
shutil.rmtree(tmpdir)
tmpdir.mkdir()
_test_run_legacy_ensemble(tmpdir, make_ensemble_builder, monkeypatch)


def _test_run_legacy_ensemble(tmpdir, make_ensemble_builder, monkeypatch):
num_reals = 2
custom_port_range = range(1024, 65535)
with tmpdir.as_cwd():
Expand Down Expand Up @@ -45,11 +55,19 @@ def test_run_legacy_ensemble(tmpdir, make_ensemble_builder, monkeypatch):

@pytest.mark.timeout(60)
def test_run_and_cancel_legacy_ensemble(tmpdir, make_ensemble_builder, monkeypatch):
_test_run_and_cancel_legacy_ensemble(tmpdir, make_ensemble_builder, monkeypatch)
monkeypatch.setattr(FeatureToggling._conf["scheduler"], "is_enabled", True)
shutil.rmtree(tmpdir)
tmpdir.mkdir()
_test_run_and_cancel_legacy_ensemble(tmpdir, make_ensemble_builder, monkeypatch)


def _test_run_and_cancel_legacy_ensemble(tmpdir, make_ensemble_builder, monkeypatch):
num_reals = 2
custom_port_range = range(1024, 65535)
with tmpdir.as_cwd():
ensemble = make_ensemble_builder(
monkeypatch, tmpdir, num_reals, 2, job_sleep=30
monkeypatch, tmpdir, num_reals, 2, job_sleep=40
).build()
config = EvaluatorServerConfig(
custom_port_range=custom_port_range,
Expand All @@ -67,6 +85,7 @@ def test_run_and_cancel_legacy_ensemble(tmpdir, make_ensemble_builder, monkeypat
ConnectionClosed
): # monitor throws some variant of CC if dispatcher dies
for _ in mon.track():
# Cancel the ensemble upon the arrival of the first event
if cancel:
mon.signal_cancel()
cancel = False
Expand Down
2 changes: 1 addition & 1 deletion tests/unit_tests/scheduler/test_scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ async def test_cancel(tmp_path: Path, realization):

scheduler_task = asyncio.create_task(sch.execute())

# Wait for the job to start
# Wait for the job to start (i.e. let the file "a" be touched)
await asyncio.sleep(1)

# Kill all jobs and wait for the scheduler to complete
Expand Down

0 comments on commit c3d9e34

Please sign in to comment.