diff --git a/tests/unit_tests/ensemble_evaluator/conftest.py b/tests/unit_tests/ensemble_evaluator/conftest.py index 5c2957257a9..288a4ba383f 100644 --- a/tests/unit_tests/ensemble_evaluator/conftest.py +++ b/tests/unit_tests/ensemble_evaluator/conftest.py @@ -1,7 +1,6 @@ import json import os import stat -from dataclasses import dataclass from pathlib import Path from unittest.mock import Mock @@ -106,10 +105,6 @@ def _make_ensemble_builder(monkeypatch, tmpdir, num_reals, num_jobs, job_sleep=0 ) ) - @dataclass - class RunArg: - iens: int - for iens in range(0, num_reals): run_path = Path(tmpdir / f"real_{iens}") os.mkdir(run_path) diff --git a/tests/unit_tests/job_queue/conftest.py b/tests/unit_tests/job_queue/conftest.py new file mode 100644 index 00000000000..21d14da146a --- /dev/null +++ b/tests/unit_tests/job_queue/conftest.py @@ -0,0 +1,43 @@ +import stat +from pathlib import Path +from unittest.mock import MagicMock + +import pytest + +import ert +from ert.load_status import LoadStatus + + +@pytest.fixture +def mock_fm_ok(monkeypatch): + fm_ok = MagicMock(return_value=(LoadStatus.LOAD_SUCCESSFUL, "")) + monkeypatch.setattr(ert.job_queue.job_queue_node, "forward_model_ok", fm_ok) + yield fm_ok + + +@pytest.fixture +def simple_script(tmp_path): + SIMPLE_SCRIPT = """#!/bin/sh +echo "finished successfully" > STATUS +""" + fout = Path(tmp_path / "job_script") + fout.write_text(SIMPLE_SCRIPT, encoding="utf-8") + fout.chmod(stat.S_IRWXU | stat.S_IRWXO | stat.S_IRWXG) + yield str(fout) + + +@pytest.fixture +def failing_script(tmp_path): + """ + This script is susceptible to race conditions. Python works + better than sh.""" + FAILING_SCRIPT = """#!/usr/bin/env python +import sys +with open("one_byte_pr_invocation", "a") as f: + f.write(".") +sys.exit(1) + """ + fout = Path(tmp_path / "failing_script") + fout.write_text(FAILING_SCRIPT, encoding="utf-8") + fout.chmod(stat.S_IRWXU | stat.S_IRWXO | stat.S_IRWXG) + yield str(fout) diff --git a/tests/unit_tests/job_queue/test_job_queue.py b/tests/unit_tests/job_queue/test_job_queue.py index 16115faa103..4822b1dd80f 100644 --- a/tests/unit_tests/job_queue/test_job_queue.py +++ b/tests/unit_tests/job_queue/test_job_queue.py @@ -1,16 +1,15 @@ import json import stat import time -from dataclasses import dataclass from pathlib import Path from threading import BoundedSemaphore from typing import Any, Callable, Dict, Optional -from unittest.mock import patch +from unittest.mock import MagicMock, patch + +import pytest -import ert.callbacks from ert.config import QueueSystem from ert.job_queue import Driver, JobQueue, JobQueueNode, JobStatus -from ert.load_status import LoadStatus def wait_for( @@ -28,71 +27,46 @@ def wait_for( ) -def dummy_exit_callback(*args): - print(args) - - DUMMY_CONFIG: Dict[str, Any] = { "job_script": "job_script.py", "num_cpu": 1, "job_name": "dummy_job_{}", "run_path": "dummy_path_{}", - "ok_callback": lambda _, _b: (LoadStatus.LOAD_SUCCESSFUL, ""), - "exit_callback": dummy_exit_callback, } -SIMPLE_SCRIPT = """#!/usr/bin/env python -print('hello') -""" -NEVER_ENDING_SCRIPT = """#!/usr/bin/env python +@pytest.fixture +def never_ending_script(tmp_path): + NEVER_ENDING_SCRIPT = """#!/usr/bin/env python import time while True: time.sleep(0.5) -""" - -FAILING_SCRIPT = """#!/usr/bin/env python -import sys -sys.exit(1) -""" - - -@dataclass -class RunArg: - iens: int + """ + fout = Path(tmp_path / "never_ending_job_script") + fout.write_text(NEVER_ENDING_SCRIPT, encoding="utf-8") + fout.chmod(stat.S_IRWXU | stat.S_IRWXO | stat.S_IRWXG) + yield str(fout) def create_local_queue( - monkeypatch, executable_script: str, max_submit: int = 1, max_runtime: Optional[int] = None, callback_timeout: Optional["Callable[[int], None]"] = None, ): - monkeypatch.setattr( - ert.job_queue.job_queue_node, "forward_model_ok", DUMMY_CONFIG["ok_callback"] - ) - monkeypatch.setattr( - JobQueueNode, "run_exit_callback", DUMMY_CONFIG["exit_callback"] - ) - driver = Driver(driver_type=QueueSystem.LOCAL) job_queue = JobQueue(driver, max_submit=max_submit) - scriptpath = Path(DUMMY_CONFIG["job_script"]) - scriptpath.write_text(executable_script, encoding="utf-8") - scriptpath.chmod(stat.S_IRWXU | stat.S_IRWXO | stat.S_IRWXG) - for iens in range(10): Path(DUMMY_CONFIG["run_path"].format(iens)).mkdir(exist_ok=False) job = JobQueueNode( - job_script=DUMMY_CONFIG["job_script"], + job_script=executable_script, job_name=DUMMY_CONFIG["job_name"].format(iens), run_path=DUMMY_CONFIG["run_path"].format(iens), num_cpu=DUMMY_CONFIG["num_cpu"], status_file=job_queue.status_file, exit_file=job_queue.exit_file, - run_arg=RunArg(iens), + run_arg=MagicMock(), max_runtime=max_runtime, callback_timeout=callback_timeout, ) @@ -109,9 +83,9 @@ def start_all(job_queue, sema_pool): job = job_queue.fetch_next_waiting() -def test_kill_jobs(tmpdir, monkeypatch): +def test_kill_jobs(tmpdir, monkeypatch, never_ending_script): monkeypatch.chdir(tmpdir) - job_queue = create_local_queue(monkeypatch, NEVER_ENDING_SCRIPT) + job_queue = create_local_queue(never_ending_script) assert job_queue.queue_size == 10 assert job_queue.is_active() @@ -140,9 +114,9 @@ def test_kill_jobs(tmpdir, monkeypatch): job.wait_for() -def test_add_jobs(tmpdir, monkeypatch): +def test_add_jobs(tmpdir, monkeypatch, simple_script): monkeypatch.chdir(tmpdir) - job_queue = create_local_queue(monkeypatch, SIMPLE_SCRIPT) + job_queue = create_local_queue(simple_script) assert job_queue.queue_size == 10 assert job_queue.is_active() @@ -160,9 +134,9 @@ def test_add_jobs(tmpdir, monkeypatch): job.wait_for() -def test_failing_jobs(tmpdir, monkeypatch): +def test_failing_jobs(tmpdir, monkeypatch, failing_script): monkeypatch.chdir(tmpdir) - job_queue = create_local_queue(monkeypatch, FAILING_SCRIPT, max_submit=1) + job_queue = create_local_queue(failing_script, max_submit=1) assert job_queue.queue_size == 10 assert job_queue.is_active() @@ -186,20 +160,17 @@ def test_failing_jobs(tmpdir, monkeypatch): assert job_queue.snapshot()[iens] == str(JobStatus.FAILED) -def test_timeout_jobs(tmpdir, monkeypatch): +def test_timeout_jobs(tmpdir, monkeypatch, never_ending_script): monkeypatch.chdir(tmpdir) job_numbers = set() - def callback(iens): - nonlocal job_numbers - job_numbers.add(iens) + mock_callback = MagicMock() job_queue = create_local_queue( - monkeypatch, - NEVER_ENDING_SCRIPT, + never_ending_script, max_submit=1, max_runtime=5, - callback_timeout=callback, + callback_timeout=mock_callback, ) assert job_queue.queue_size == 10 @@ -222,15 +193,15 @@ def callback(iens): iens = job_queue._differ.qindex_to_iens(q_index) assert job_queue.snapshot()[iens] == str(JobStatus.IS_KILLED) - assert job_numbers == set(range(10)) + assert len(mock_callback.mock_calls) == 20 for job in job_queue.job_list: job.wait_for() -def test_add_dispatch_info(tmpdir, monkeypatch): +def test_add_dispatch_info(tmpdir, monkeypatch, simple_script): monkeypatch.chdir(tmpdir) - job_queue = create_local_queue(monkeypatch, SIMPLE_SCRIPT) + job_queue = create_local_queue(simple_script) ens_id = "some_id" cert = "My very nice cert" token = "my_super_secret_token" @@ -259,9 +230,9 @@ def test_add_dispatch_info(tmpdir, monkeypatch): assert (runpath / cert_file).read_text(encoding="utf-8") == cert -def test_add_dispatch_info_cert_none(tmpdir, monkeypatch): +def test_add_dispatch_info_cert_none(tmpdir, monkeypatch, simple_script): monkeypatch.chdir(tmpdir) - job_queue = create_local_queue(monkeypatch, SIMPLE_SCRIPT) + job_queue = create_local_queue(simple_script) ens_id = "some_id" dispatch_url = "wss://example.org" cert = None diff --git a/tests/unit_tests/job_queue/test_job_queue_manager.py b/tests/unit_tests/job_queue/test_job_queue_manager.py index fbf7c963f9b..64979b024da 100644 --- a/tests/unit_tests/job_queue/test_job_queue_manager.py +++ b/tests/unit_tests/job_queue/test_job_queue_manager.py @@ -1,62 +1,28 @@ import os import stat -from dataclasses import dataclass from pathlib import Path from threading import BoundedSemaphore -from typing import Callable, List, TypedDict +from typing import List, TypedDict +from unittest.mock import MagicMock import pytest -import ert.callbacks from ert.config import QueueSystem from ert.job_queue import Driver, JobQueue, JobQueueManager, JobQueueNode, JobStatus -from ert.load_status import LoadStatus - - -@dataclass -class RunArg: - iens: int class Config(TypedDict): - job_script: str num_cpu: int job_name: str run_path: str - ok_callback: Callable - exit_callback: Callable - - -def dummy_ok_callback(runarg, path): - (Path(path) / "OK").write_text("success", encoding="utf-8") - return (LoadStatus.LOAD_SUCCESSFUL, "") - - -def dummy_exit_callback(self): - Path("ERROR").write_text("failure", encoding="utf-8") DUMMY_CONFIG: Config = { - "job_script": "job_script.py", "num_cpu": 1, "job_name": "dummy_job_{}", "run_path": "dummy_path_{}", - "ok_callback": dummy_ok_callback, - "exit_callback": dummy_exit_callback, } -SIMPLE_SCRIPT = """#!/bin/sh -echo "finished successfully" > STATUS -""" - -# This script is susceptible to race conditions. Python works -# better than sh. -FAILING_SCRIPT = """#!/usr/bin/env python -import sys -with open("one_byte_pr_invocation", "a") as f: - f.write(".") -sys.exit(1) -""" MOCK_BSUB = """#!/bin/sh echo "$@" > test.out @@ -67,55 +33,33 @@ def dummy_exit_callback(self): def create_local_queue( - monkeypatch, executable_script: str, max_submit: int = 2, num_realizations: int = 10 + executable_script: str, max_submit: int = 2, num_realizations: int = 10 ): - monkeypatch.setattr( - ert.job_queue.job_queue_node, "forward_model_ok", DUMMY_CONFIG["ok_callback"] - ) - monkeypatch.setattr( - JobQueueNode, "run_exit_callback", DUMMY_CONFIG["exit_callback"] - ) - driver = Driver(driver_type=QueueSystem.LOCAL) job_queue = JobQueue(driver, max_submit=max_submit) - scriptpath = Path(DUMMY_CONFIG["job_script"]) - scriptpath.write_text(executable_script, encoding="utf-8") - scriptpath.chmod(stat.S_IRWXU | stat.S_IRWXO | stat.S_IRWXG) - for iens in range(num_realizations): Path(DUMMY_CONFIG["run_path"].format(iens)).mkdir() job = JobQueueNode( - job_script=DUMMY_CONFIG["job_script"], + job_script=executable_script, job_name=DUMMY_CONFIG["job_name"].format(iens), run_path=os.path.realpath(DUMMY_CONFIG["run_path"].format(iens)), num_cpu=DUMMY_CONFIG["num_cpu"], status_file=job_queue.status_file, exit_file=job_queue.exit_file, - run_arg=RunArg(iens), - ensemble_config=Path(DUMMY_CONFIG["run_path"].format(iens)).resolve(), + run_arg=MagicMock(), ) job_queue.add_job(job, iens) return job_queue -def test_num_cpu_submitted_correctly_lsf(tmpdir, monkeypatch): +@pytest.mark.usefixtures("use_tmpdir", "mock_fm_ok") +def test_num_cpu_submitted_correctly_lsf(tmpdir, simple_script): """Assert that num_cpu from the ERT configuration is passed on to the bsub command used to submit jobs to LSF""" - monkeypatch.setattr( - ert.job_queue.job_queue_node, "forward_model_ok", DUMMY_CONFIG["ok_callback"] - ) - monkeypatch.setattr( - JobQueueNode, "run_exit_callback", DUMMY_CONFIG["exit_callback"] - ) - monkeypatch.chdir(tmpdir) os.putenv("PATH", os.getcwd() + ":" + os.getenv("PATH")) driver = Driver(driver_type=QueueSystem.LSF) - script = Path(DUMMY_CONFIG["job_script"]) - script.write_text(SIMPLE_SCRIPT, encoding="utf-8") - script.chmod(stat.S_IRWXU) - bsub = Path("bsub") bsub.write_text(MOCK_BSUB, encoding="utf-8") bsub.chmod(stat.S_IRWXU) @@ -125,14 +69,13 @@ def test_num_cpu_submitted_correctly_lsf(tmpdir, monkeypatch): os.mkdir(DUMMY_CONFIG["run_path"].format(job_id)) job = JobQueueNode( - job_script=DUMMY_CONFIG["job_script"], + job_script=simple_script, job_name=DUMMY_CONFIG["job_name"].format(job_id), run_path=os.path.realpath(DUMMY_CONFIG["run_path"].format(job_id)), num_cpu=4, status_file="STATUS", exit_file="ERROR", - run_arg=RunArg(iens=job_id), - ensemble_config=Path(DUMMY_CONFIG["run_path"].format(job_id)).resolve(), + run_arg=MagicMock(), ) pool_sema = BoundedSemaphore(value=2) @@ -153,25 +96,23 @@ def test_num_cpu_submitted_correctly_lsf(tmpdir, monkeypatch): assert found_cpu_arg is True -def test_execute_queue(tmpdir, monkeypatch): +def test_execute_queue(tmpdir, monkeypatch, mock_fm_ok, simple_script): monkeypatch.chdir(tmpdir) - job_queue = create_local_queue(monkeypatch, SIMPLE_SCRIPT) + job_queue = create_local_queue(simple_script) manager = JobQueueManager(job_queue) manager.execute_queue() - for job in job_queue.job_list: - assert (Path(job.run_path) / "OK").read_text(encoding="utf-8") == "success" + assert len(mock_fm_ok.mock_calls) == len(job_queue.job_list) @pytest.mark.parametrize("max_submit_num", [1, 2, 3]) -def test_max_submit_reached(tmpdir, max_submit_num, monkeypatch): +def test_max_submit_reached(tmpdir, max_submit_num, monkeypatch, failing_script): """Check that the JobQueueManager will submit exactly the maximum number of resubmissions in the case of scripts that fail.""" monkeypatch.chdir(tmpdir) num_realizations = 2 job_queue = create_local_queue( - monkeypatch, - FAILING_SCRIPT, + failing_script, max_submit=max_submit_num, num_realizations=num_realizations, ) @@ -193,11 +134,9 @@ def test_max_submit_reached(tmpdir, max_submit_num, monkeypatch): @pytest.mark.parametrize("max_submit_num", [1, 2, 3]) -def test_kill_queue(tmpdir, max_submit_num, monkeypatch): +def test_kill_queue(tmpdir, max_submit_num, monkeypatch, simple_script): monkeypatch.chdir(tmpdir) - job_queue = create_local_queue( - monkeypatch, SIMPLE_SCRIPT, max_submit=max_submit_num - ) + job_queue = create_local_queue(simple_script, max_submit=max_submit_num) manager = JobQueueManager(job_queue) job_queue.kill_all_jobs() manager.execute_queue() diff --git a/tests/unit_tests/job_queue/test_job_queue_manager_torque.py b/tests/unit_tests/job_queue/test_job_queue_manager_torque.py index a0b2fd255c2..76a89f78897 100644 --- a/tests/unit_tests/job_queue/test_job_queue_manager_torque.py +++ b/tests/unit_tests/job_queue/test_job_queue_manager_torque.py @@ -1,16 +1,14 @@ import os import stat -from dataclasses import dataclass from pathlib import Path from threading import BoundedSemaphore -from typing import Callable, TypedDict +from typing import TypedDict +from unittest.mock import MagicMock import pytest -import ert.job_queue.job_queue_node from ert.config import QueueSystem from ert.job_queue import Driver, JobQueueNode, JobStatus -from ert.load_status import LoadStatus @pytest.fixture(name="temp_working_directory") @@ -22,48 +20,17 @@ def fixture_temp_working_directory(tmpdir, monkeypatch): @pytest.fixture(name="dummy_config") def fixture_dummy_config(): return JobConfig( - { - "job_script": "job_script.py", - "num_cpu": 1, - "job_name": "dummy_job_{}", - "run_path": "dummy_path_{}", - "ok_callback": dummy_ok_callback, - "exit_callback": dummy_exit_callback, - } + num_cpu=1, + job_name="dummy_job_{}", + run_path="dummy_path_{}", ) -@dataclass -class RunArg: - iens: int - - class JobConfig(TypedDict): - job_script: str num_cpu: int job_name: str run_path: str - ok_callback: Callable - exit_callback: Callable - - -def dummy_ok_callback(runargs, path): - (Path(path) / "OK").write_text("success", encoding="utf-8") - return (LoadStatus.LOAD_SUCCESSFUL, "") - - -def dummy_exit_callback(*_args): - Path("ERROR").write_text("failure", encoding="utf-8") - -SIMPLE_SCRIPT = """#!/bin/sh -echo "finished successfully" > STATUS -""" - -FAILING_FORWARD_MODEL = """#!/usr/bin/env python -import sys -sys.exit(1) -""" MOCK_QSUB = """#!/bin/sh echo "torque job submitted" > job_output @@ -143,30 +110,23 @@ def _deploy_script(scriptname: Path, scripttext: str): script.chmod(stat.S_IRWXU) -def _build_jobqueuenode(monkeypatch, dummy_config: JobConfig, job_id=0): - monkeypatch.setattr( - ert.job_queue.job_queue_node, "forward_model_ok", dummy_config["ok_callback"] - ) - monkeypatch.setattr( - JobQueueNode, "run_exit_callback", dummy_config["exit_callback"] - ) - +def _build_jobqueuenode(job_script, dummy_config: JobConfig, job_id=0): runpath = Path(dummy_config["run_path"].format(job_id)) runpath.mkdir() job = JobQueueNode( - job_script=dummy_config["job_script"], + job_script=job_script, job_name=dummy_config["job_name"].format(job_id), run_path=os.path.realpath(dummy_config["run_path"].format(job_id)), num_cpu=1, status_file="STATUS", exit_file="ERROR", - run_arg=RunArg(iens=job_id), - ensemble_config=Path(dummy_config["run_path"].format(job_id)).resolve(), + run_arg=MagicMock(), ) - return (job, runpath) + return job, runpath +@pytest.mark.usefixtures("use_tmpdir") @pytest.mark.parametrize( "qsub_script, qstat_script", [ @@ -183,7 +143,12 @@ def _build_jobqueuenode(monkeypatch, dummy_config: JobConfig, job_id=0): ], ) def test_run_torque_job( - monkeypatch, temp_working_directory, dummy_config, qsub_script, qstat_script + temp_working_directory, + dummy_config, + qsub_script, + qstat_script, + mock_fm_ok, + simple_script, ): """Verify that the torque driver will succeed in submitting and monitoring torque jobs even when the Torque commands qsub and qstat @@ -192,7 +157,6 @@ def test_run_torque_job( A flaky torque command is a shell script that sometimes but not always returns with a non-zero exit code.""" - _deploy_script(dummy_config["job_script"], SIMPLE_SCRIPT) _deploy_script("qsub", qsub_script) _deploy_script("qstat", qstat_script) @@ -201,7 +165,7 @@ def test_run_torque_job( options=[("QSTAT_CMD", temp_working_directory / "qstat")], ) - (job, runpath) = _build_jobqueuenode(monkeypatch, dummy_config) + job, runpath = _build_jobqueuenode(simple_script, dummy_config) job.run(driver, BoundedSemaphore()) job.wait_for() @@ -210,24 +174,24 @@ def test_run_torque_job( assert Path("job_output").exists() # The "done" callback: - assert (runpath / "OK").read_text(encoding="utf-8") == "success" + mock_fm_ok.assert_called() +@pytest.mark.usefixtures("use_tmpdir") @pytest.mark.parametrize( "user_qstat_option, expected_options", [("", "-f 10001"), ("-x", "-f -x 10001"), ("-f", "-f -f 10001")], ) def test_that_torque_driver_passes_options_to_qstat( - monkeypatch, temp_working_directory, dummy_config, user_qstat_option, expected_options, + simple_script, ): """The driver supports setting options to qstat, but the hard-coded -f option is always there.""" - _deploy_script(dummy_config["job_script"], SIMPLE_SCRIPT) _deploy_script("qsub", MOCK_QSUB) _deploy_script( "qstat", @@ -245,13 +209,14 @@ def test_that_torque_driver_passes_options_to_qstat( ], ) - job, _runpath = _build_jobqueuenode(monkeypatch, dummy_config) + job, _runpath = _build_jobqueuenode(simple_script, dummy_config) job.run(driver, BoundedSemaphore()) job.wait_for() assert Path("qstat_options").read_text(encoding="utf-8").strip() == expected_options +@pytest.mark.usefixtures("mock_fm_ok", "use_tmpdir") @pytest.mark.parametrize( "job_state, exit_status, expected_status", [ @@ -264,14 +229,13 @@ def test_that_torque_driver_passes_options_to_qstat( ], ) def test_torque_job_status_from_qstat_output( - monkeypatch, temp_working_directory, dummy_config, job_state, exit_status, expected_status, + simple_script, ): - _deploy_script(dummy_config["job_script"], SIMPLE_SCRIPT) _deploy_script("qsub", MOCK_QSUB) _deploy_script( "qstat", @@ -284,7 +248,7 @@ def test_torque_job_status_from_qstat_output( options=[("QSTAT_CMD", temp_working_directory / "qstat")], ) - job, _runpath = _build_jobqueuenode(monkeypatch, dummy_config) + job, _runpath = _build_jobqueuenode(simple_script, dummy_config) pool_sema = BoundedSemaphore(value=2) job.run(driver, pool_sema) diff --git a/tests/unit_tests/status/test_tracking_integration.py b/tests/unit_tests/status/test_tracking_integration.py index 298096fbce5..f559638ca46 100644 --- a/tests/unit_tests/status/test_tracking_integration.py +++ b/tests/unit_tests/status/test_tracking_integration.py @@ -185,7 +185,8 @@ def test_tracking( os.chdir(ert_config.config_path) ert = EnKFMain(ert_config) experiment_id = storage.create_experiment( - ert.ensembleConfig().parameter_configuration + parameters=ert.ensembleConfig().parameter_configuration, + responses=ert.ensembleConfig().response_configuration, ) model = create_model( diff --git a/tests/unit_tests/test_load_forward_model.py b/tests/unit_tests/test_load_forward_model.py index 2866b7aa434..006b4778fc4 100644 --- a/tests/unit_tests/test_load_forward_model.py +++ b/tests/unit_tests/test_load_forward_model.py @@ -149,9 +149,7 @@ def test_load_forward_model(snake_oil_default_storage): ), ], ) -def test_load_forward_model_summary( - summary_configuration, prior_ensemble, expected, caplog -): +def test_load_forward_model_summary(summary_configuration, storage, expected, caplog): config_text = dedent( """ NUM_REALIZATIONS 1