From 9b5c9332f6b494fd499872ef73c0012aead5dbdc Mon Sep 17 00:00:00 2001 From: "Yngve S. Kristiansen" Date: Mon, 28 Oct 2024 12:19:55 +0100 Subject: [PATCH] Add some detached <-> scheduler edge cases --- src/everest/detached/__init__.py | 33 +++++++++++++++---- tests/everest/test_detached.py | 55 ++++++++++++++++++++++++++++++++ 2 files changed, 82 insertions(+), 6 deletions(-) diff --git a/src/everest/detached/__init__.py b/src/everest/detached/__init__.py index 024745a7606..ff2d027618d 100644 --- a/src/everest/detached/__init__.py +++ b/src/everest/detached/__init__.py @@ -14,7 +14,7 @@ from seba_sqlite.exceptions import ObjectNotFoundError from seba_sqlite.snapshot import SebaSnapshot -from ert import BatchContext, BatchSimulator +from ert import BatchContext, BatchSimulator, JobState from ert.config import ErtConfig, QueueSystem from everest.config import EverestConfig from everest.config_keys import ConfigKeys as CK @@ -180,11 +180,32 @@ def wait_for_server( ) # Job queueing may fail: if context is not None and context.has_job_failed(0): - path = context.job_progress(0).steps[0].std_err_file - for err in extract_errors_from_file(path): - update_everserver_status(config, ServerStatus.failed, message=err) - logging.error(err) - raise SystemExit("Failed to start Everest server.") + job_progress = context.job_progress(0) + + if job_progress is not None: + path = context.job_progress(0).steps[0].std_err_file + for err in extract_errors_from_file(path): + update_everserver_status( + config, ServerStatus.failed, message=err + ) + logging.error(err) + raise SystemExit("Failed to start Everest server.") + else: + try: + state = context.get_job_state(0) + + if state == JobState.WAITING: + # Job did fail, but is now in WAITING + logging.error( + "Race condition in wait_for_server, job did fail but is now in WAITING" + ) + except IndexError as e: + # Job is no longer registered in scheduler + logging.error( + f"Race condition in wait_for_server, failed job removed from scheduler\n{e}" + ) + raise SystemExit("Failed to start Everest server.") from e + sleep_time = sleep_time_increment * (2**retry_count) time.sleep(sleep_time) if server_is_running(config): diff --git a/tests/everest/test_detached.py b/tests/everest/test_detached.py index 1ccdce32444..a8ecf4227b8 100644 --- a/tests/everest/test_detached.py +++ b/tests/everest/test_detached.py @@ -1,11 +1,13 @@ import logging import os +import shutil from collections import namedtuple from unittest.mock import patch import pytest import requests +from ert import JobState from ert.config import ErtConfig, QueueSystem from ert.storage import open_storage from everest.config import EverestConfig @@ -37,6 +39,7 @@ SIMULATION_DIR, ) from everest.util import makedirs_if_needed +from tests.everest.utils import relpath class MockContext: @@ -183,6 +186,58 @@ def test_wait_for_server( assert server_status["message"] == expected_error_msg +@patch("everest.detached.server_is_running", return_value=False) +@pytest.mark.usefixtures("change_to_tmpdir") +def test_wait_for_handles_failed_job_race_condition_failed_job_to_waiting( + server_is_running_mock, caplog +): + shutil.copytree(relpath("test_data", "detached"), ".", dirs_exist_ok=True) + config = EverestConfig.load_file("valid_yaml_config.yml") + + class _MockContext(MockContext): + @staticmethod + def job_progress(*args): + return None + + @staticmethod + def get_job_state(*args): + return JobState.WAITING + + with caplog.at_level(logging.ERROR), pytest.raises(RuntimeError): + wait_for_server(config, timeout=1, context=_MockContext()) + + assert ( + "Race condition in wait_for_server, job did fail but is now in WAITING" + in caplog.messages + ) + + +@patch("everest.detached.server_is_running", return_value=False) +@pytest.mark.usefixtures("change_to_tmpdir") +def test_wait_for_handles_failed_job_race_condition_failed_job_removed_from_scheduler( + server_is_running_mock, caplog +): + shutil.copytree(relpath("test_data", "detached"), ".", dirs_exist_ok=True) + config = EverestConfig.load_file("valid_yaml_config.yml") + + class _MockContext(MockContext): + @staticmethod + def job_progress(*args): + return None + + @staticmethod + def get_job_state(*args): + raise IndexError("Some trackback") + + with caplog.at_level(logging.ERROR), pytest.raises(SystemExit): + wait_for_server(config, timeout=1, context=_MockContext()) + + assert any( + "Race condition in wait_for_server, failed job removed from scheduler" + for x in caplog.messages + ) + + def _get_reference_config(): everest_config = EverestConfig.load_file("config_minimal.yml") reference_config = ErtConfig.read_site_config()