Skip to content

Commit

Permalink
Add some detached <-> scheduler edge cases
Browse files Browse the repository at this point in the history
  • Loading branch information
yngve-sk authored Oct 28, 2024
1 parent 9817d75 commit 9b5c933
Show file tree
Hide file tree
Showing 2 changed files with 82 additions and 6 deletions.
33 changes: 27 additions & 6 deletions src/everest/detached/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
from seba_sqlite.exceptions import ObjectNotFoundError
from seba_sqlite.snapshot import SebaSnapshot

from ert import BatchContext, BatchSimulator
from ert import BatchContext, BatchSimulator, JobState
from ert.config import ErtConfig, QueueSystem
from everest.config import EverestConfig
from everest.config_keys import ConfigKeys as CK
Expand Down Expand Up @@ -180,11 +180,32 @@ def wait_for_server(
)
# Job queueing may fail:
if context is not None and context.has_job_failed(0):
path = context.job_progress(0).steps[0].std_err_file
for err in extract_errors_from_file(path):
update_everserver_status(config, ServerStatus.failed, message=err)
logging.error(err)
raise SystemExit("Failed to start Everest server.")
job_progress = context.job_progress(0)

if job_progress is not None:
path = context.job_progress(0).steps[0].std_err_file
for err in extract_errors_from_file(path):
update_everserver_status(
config, ServerStatus.failed, message=err
)
logging.error(err)
raise SystemExit("Failed to start Everest server.")
else:
try:
state = context.get_job_state(0)

if state == JobState.WAITING:
# Job did fail, but is now in WAITING
logging.error(
"Race condition in wait_for_server, job did fail but is now in WAITING"
)
except IndexError as e:
# Job is no longer registered in scheduler
logging.error(
f"Race condition in wait_for_server, failed job removed from scheduler\n{e}"
)
raise SystemExit("Failed to start Everest server.") from e

sleep_time = sleep_time_increment * (2**retry_count)
time.sleep(sleep_time)
if server_is_running(config):
Expand Down
55 changes: 55 additions & 0 deletions tests/everest/test_detached.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,13 @@
import logging
import os
import shutil
from collections import namedtuple
from unittest.mock import patch

import pytest
import requests

from ert import JobState
from ert.config import ErtConfig, QueueSystem
from ert.storage import open_storage
from everest.config import EverestConfig
Expand Down Expand Up @@ -37,6 +39,7 @@
SIMULATION_DIR,
)
from everest.util import makedirs_if_needed
from tests.everest.utils import relpath


class MockContext:
Expand Down Expand Up @@ -183,6 +186,58 @@ def test_wait_for_server(
assert server_status["message"] == expected_error_msg


@patch("everest.detached.server_is_running", return_value=False)
@pytest.mark.usefixtures("change_to_tmpdir")
def test_wait_for_handles_failed_job_race_condition_failed_job_to_waiting(
server_is_running_mock, caplog
):
shutil.copytree(relpath("test_data", "detached"), ".", dirs_exist_ok=True)
config = EverestConfig.load_file("valid_yaml_config.yml")

class _MockContext(MockContext):
@staticmethod
def job_progress(*args):
return None

@staticmethod
def get_job_state(*args):
return JobState.WAITING

with caplog.at_level(logging.ERROR), pytest.raises(RuntimeError):
wait_for_server(config, timeout=1, context=_MockContext())

assert (
"Race condition in wait_for_server, job did fail but is now in WAITING"
in caplog.messages
)


@patch("everest.detached.server_is_running", return_value=False)
@pytest.mark.usefixtures("change_to_tmpdir")
def test_wait_for_handles_failed_job_race_condition_failed_job_removed_from_scheduler(
server_is_running_mock, caplog
):
shutil.copytree(relpath("test_data", "detached"), ".", dirs_exist_ok=True)
config = EverestConfig.load_file("valid_yaml_config.yml")

class _MockContext(MockContext):
@staticmethod
def job_progress(*args):
return None

@staticmethod
def get_job_state(*args):
raise IndexError("Some trackback")

with caplog.at_level(logging.ERROR), pytest.raises(SystemExit):
wait_for_server(config, timeout=1, context=_MockContext())

assert any(
"Race condition in wait_for_server, failed job removed from scheduler"
for x in caplog.messages
)


def _get_reference_config():
everest_config = EverestConfig.load_file("config_minimal.yml")
reference_config = ErtConfig.read_site_config()
Expand Down

0 comments on commit 9b5c933

Please sign in to comment.