Skip to content

Commit

Permalink
Show error when everest fm_step reaches MAX_RUNTIME
Browse files Browse the repository at this point in the history
Also log the error in forward_models.log
  • Loading branch information
DanSava committed Dec 16, 2024
1 parent d875982 commit 8e0604b
Show file tree
Hide file tree
Showing 3 changed files with 72 additions and 19 deletions.
9 changes: 6 additions & 3 deletions src/ert/run_models/everest_run_model.py
Original file line number Diff line number Diff line change
Expand Up @@ -304,25 +304,27 @@ def _handle_errors(
realization: str,
fm_name: str,
error_path: str,
fm_running_err: str,
) -> None:
fm_id = f"b_{batch}_r_{realization}_s_{simulation}_{fm_name}"
fm_logger = logging.getLogger("forward_models")
with open(error_path, encoding="utf-8") as errors:
error_str = errors.read()

error_str = error_str or fm_running_err
error_hash = hash(error_str)
err_msg = "Batch: {} Realization: {} Simulation: {} Job: {} Failed {}".format(
batch, realization, simulation, fm_name, "Error: {}\n {}"
batch, realization, simulation, fm_name, "\n Error: {} ID:{}"
)

if error_hash not in self._fm_errors:
error_id = len(self._fm_errors)
fm_logger.error(err_msg.format(error_id, error_str))
fm_logger.error(err_msg.format(error_str, error_id))
self._fm_errors.update({error_hash: {"error_id": error_id, "ids": [fm_id]}})
elif fm_id not in self._fm_errors[error_hash]["ids"]:
self._fm_errors[error_hash]["ids"].append(fm_id)
error_id = self._fm_errors[error_hash]["error_id"]
fm_logger.error(err_msg.format(error_id, ""))
fm_logger.error(err_msg.format("Already reported as", error_id))

def _delete_runpath(self, run_args: list[RunArg]) -> None:
logging.getLogger(EVEREST).debug("Simulation callback called")
Expand Down Expand Up @@ -730,6 +732,7 @@ def _simulation_status(self, snapshot: EnsembleSnapshot) -> SimulationStatus:
realization=realization,
fm_name=fm_step.get("name", "Unknwon"), # type: ignore
error_path=fm_step.get("stderr", ""), # type: ignore
fm_running_err=fm_step.get("error", ""), # type: ignore
)
jobs_progress.append(jobs)

Expand Down
18 changes: 7 additions & 11 deletions src/everest/detached/jobs/everserver.py
Original file line number Diff line number Diff line change
Expand Up @@ -398,17 +398,13 @@ def _failed_realizations_messages(shared_data):
messages = [OPT_FAILURE_REALIZATIONS]
failed = shared_data[SIM_PROGRESS_ENDPOINT]["status"]["failed"]
if failed > 0:
# Find the set of jobs that failed. To keep the order in which they
# are found in the queue, use a dict as sets are not ordered.
failed_jobs = dict.fromkeys(
job["name"]
for queue in shared_data[SIM_PROGRESS_ENDPOINT]["progress"]
for job in queue
if job["status"] == JOB_FAILURE
).keys()
messages.append(
"{} job failures caused by: {}".format(failed, ", ".join(failed_jobs))
)
# Report each unique pair of failed job name and error
for queue in shared_data[SIM_PROGRESS_ENDPOINT]["progress"]:
for job in queue:
if job["status"] == JOB_FAILURE:
err_msg = f"{job['name']} Failed with: {job.get('error', '')}"
if err_msg not in messages:
messages.append(err_msg)
return messages


Expand Down
64 changes: 59 additions & 5 deletions tests/everest/test_everserver.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
from pathlib import Path
from unittest.mock import patch

import pytest
from ropt.enums import OptimizerExitCode
from seba_sqlite.snapshot import SebaSnapshot

Expand Down Expand Up @@ -52,6 +53,14 @@ def set_shared_status(*args, progress, shared_data):
}


def _add_snippet(file_path, snippet, position):
with open(file_path, encoding="utf-8") as file:
lines = file.readlines()
lines.insert(position - 1, snippet + "\n")
with open(file_path, "w", encoding="utf-8") as file:
file.writelines(lines)


def test_certificate_generation(copy_math_func_test_data_to_tmp):
config = EverestConfig.load_file("config_minimal.yml")
cert, key, pw = everserver._generate_certificate(
Expand Down Expand Up @@ -170,12 +179,12 @@ def test_everserver_status_running_complete(
set_shared_status,
progress=[
[
{"name": "job1", "status": JOB_FAILURE},
{"name": "job1", "status": JOB_FAILURE},
{"name": "job1", "status": JOB_FAILURE, "error": "job 1 error 1"},
{"name": "job1", "status": JOB_FAILURE, "error": "job 1 error 2"},
],
[
{"name": "job2", "status": JOB_SUCCESS},
{"name": "job2", "status": JOB_FAILURE},
{"name": "job2", "status": JOB_SUCCESS, "error": ""},
{"name": "job2", "status": JOB_FAILURE, "error": "job 2 error 1"},
],
],
),
Expand All @@ -193,7 +202,9 @@ def test_everserver_status_failed_job(
# The server should fail and store a user-friendly message.
assert status["status"] == ServerStatus.failed
assert OPT_FAILURE_REALIZATIONS in status["message"]
assert "3 job failures caused by: job1, job2" in status["message"]
assert "job1 Failed with: job 1 error 1" in status["message"]
assert "job1 Failed with: job 1 error 2" in status["message"]
assert "job2 Failed with: job 2 error 1" in status["message"]


@patch("sys.argv", ["name", "--config-file", "config_minimal.yml"])
Expand Down Expand Up @@ -271,3 +282,46 @@ def test_everserver_status_max_batch_num(
filter_out_gradient=False, batches=None
)
assert {data.batch for data in snapshot.simulation_data} == {0}


@patch("sys.argv", ["name", "--config-file", "config_minimal.yml"])
@patch("everest.detached.jobs.everserver._configure_loggers")
@patch("everest.detached.jobs.everserver._generate_authentication")
@patch(
"everest.detached.jobs.everserver._generate_certificate",
return_value=(None, None, None),
)
@patch(
"everest.detached.jobs.everserver._find_open_port",
return_value=42,
)
@patch("everest.detached.jobs.everserver._write_hostfile")
@patch("everest.detached.jobs.everserver._everserver_thread")
def test_everserver_status_contains_max_runtime_failure(
_1, _2, _3, _4, _5, _6, copy_math_func_test_data_to_tmp,
):
config_file = "config_minimal.yml"
# Add sleep to distance3 job
_add_snippet(
file_path="jobs/distance3.py",
snippet=" import time\n time.sleep(5)\n",
position=21,
)
# Add 1 second max runtime
_add_snippet(
file_path="config_minimal.yml",
snippet="\nsimulator:\n max_runtime: 2\n",
position=1,
)
config = EverestConfig.load_file(config_file)
everserver.main()
status = everserver_status(
ServerConfig.get_everserver_status_path(config.output_dir)
)

assert status["status"] == ServerStatus.failed
print(status["message"])
assert (
"distance3 Failed with: The run is cancelled due to reaching MAX_RUNTIME"
in status["message"]
)

0 comments on commit 8e0604b

Please sign in to comment.