From 97478fa435c02f85dad9c08d2c8093ea2095ec45 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?H=C3=A5vard=20Berland?= Date: Mon, 28 Oct 2024 15:21:32 +0100 Subject: [PATCH] Rename job to step in runner (many more of these to go) --- src/_ert/forward_model_runner/runner.py | 53 ++++++++++--------- .../test_file_reporter.py | 4 +- .../test_forward_model_runner.py | 2 +- .../forward_model_runner/test_job_dispatch.py | 28 +++++----- 4 files changed, 45 insertions(+), 42 deletions(-) diff --git a/src/_ert/forward_model_runner/runner.py b/src/_ert/forward_model_runner/runner.py index 1b76e393fae..bd304f3c7d3 100644 --- a/src/_ert/forward_model_runner/runner.py +++ b/src/_ert/forward_model_runner/runner.py @@ -2,28 +2,29 @@ import json import os from pathlib import Path -from typing import List +from typing import Any, Dict, List from _ert.forward_model_runner.forward_model_step import ForwardModelStep from _ert.forward_model_runner.reporting.message import Checksum, Finish, Init class ForwardModelRunner: - def __init__(self, jobs_data): - self.jobs_data = jobs_data - self.simulation_id = jobs_data.get("run_id") - self.experiment_id = jobs_data.get("experiment_id") - self.ens_id = jobs_data.get("ens_id") - self.real_id = jobs_data.get("real_id") - self.ert_pid = jobs_data.get("ert_pid") - self.global_environment = jobs_data.get("global_environment") - job_data_list = jobs_data["jobList"] + def __init__(self, steps_data: Dict[str, Any]): + self.steps_data = ( + steps_data # On disk, this is called jobs.json for legacy reasons + ) + self.simulation_id = steps_data.get("run_id") + self.experiment_id = steps_data.get("experiment_id") + self.ens_id = steps_data.get("ens_id") + self.real_id = steps_data.get("real_id") + self.ert_pid = steps_data.get("ert_pid") + self.global_environment = steps_data.get("global_environment") if self.simulation_id is not None: os.environ["ERT_RUN_ID"] = self.simulation_id - self.jobs: List[ForwardModelStep] = [] - for index, job_data in enumerate(job_data_list): - self.jobs.append(ForwardModelStep(job_data, index)) + self.steps: List[ForwardModelStep] = [] + for index, step_data in enumerate(steps_data["jobList"]): + self.steps.append(ForwardModelStep(step_data, index)) self._set_environment() @@ -48,15 +49,15 @@ def _populate_checksums(self, manifest): info["error"] = f"Expected file {path} not created by forward model!" return manifest - def run(self, names_of_jobs_to_run): - # if names_of_jobs_to_run, create job_queue which contains jobs that - # are to be run. - if not names_of_jobs_to_run: - job_queue = self.jobs + def run(self, names_of_steps_to_run: List[str]): + if not names_of_steps_to_run: + step_queue = self.steps else: - job_queue = [j for j in self.jobs if j.name() in names_of_jobs_to_run] + step_queue = [ + step for step in self.steps if step.name() in names_of_steps_to_run + ] init_message = Init( - job_queue, + step_queue, self.simulation_id, self.ert_pid, self.ens_id, @@ -64,23 +65,25 @@ def run(self, names_of_jobs_to_run): self.experiment_id, ) - unused = set(names_of_jobs_to_run) - {j.name() for j in job_queue} + unused = set(names_of_steps_to_run) - {step.name() for step in step_queue} if unused: init_message.with_error( f"{unused} does not exist. " - f"Available jobs: {[j.name() for j in self.jobs]}" + f"Available forward_model steps: {[step.name() for step in self.steps]}" ) yield init_message return else: yield init_message - for job in job_queue: - for status_update in job.run(): + for step in step_queue: + for status_update in step.run(): yield status_update if not status_update.success(): yield Checksum(checksum_dict={}, run_path=os.getcwd()) - yield Finish().with_error("Not all jobs completed successfully.") + yield Finish().with_error( + "Not all forward model steps completed successfully." + ) return checksum_dict = self._populate_checksums(self._read_manifest()) diff --git a/tests/ert/unit_tests/forward_model_runner/test_file_reporter.py b/tests/ert/unit_tests/forward_model_runner/test_file_reporter.py index e9f97d5fc72..d64af04d83c 100644 --- a/tests/ert/unit_tests/forward_model_runner/test_file_reporter.py +++ b/tests/ert/unit_tests/forward_model_runner/test_file_reporter.py @@ -116,7 +116,7 @@ def test_report_with_failed_exit_message_argument(reporter): assert "EXIT: 1/massive_failure" in f.readline() with open(ERROR_file, "r", encoding="utf-8") as f: content = "".join(f.readlines()) - assert "fmstep1" in content, "ERROR file missing job" + assert "fmstep1" in content, "ERROR file missing fmstep" assert ( "massive_failure" in content ), "ERROR file missing reason" @@ -199,7 +199,7 @@ def test_old_file_deletion(reporter): @pytest.mark.usefixtures("use_tmpdir") def test_status_file_is_correct(reporter): - """The STATUS file is a file to which we append data about jobs as they + """The STATUS file is a file to which we append data about steps as they are run. So this involves multiple reports, and should be tested as such. See https://github.com/equinor/libres/issues/764 diff --git a/tests/ert/unit_tests/forward_model_runner/test_forward_model_runner.py b/tests/ert/unit_tests/forward_model_runner/test_forward_model_runner.py index b1616d971c2..5f0974791f3 100644 --- a/tests/ert/unit_tests/forward_model_runner/test_forward_model_runner.py +++ b/tests/ert/unit_tests/forward_model_runner/test_forward_model_runner.py @@ -250,7 +250,7 @@ def test_exec_env(): @pytest.mark.usefixtures("use_tmpdir") -def test_env_var_available_inside_job_context(): +def test_env_var_available_inside_step_context(): with open("run_me.py", "w", encoding="utf-8") as f: f.write( """#!/usr/bin/env python\n diff --git a/tests/ert/unit_tests/forward_model_runner/test_job_dispatch.py b/tests/ert/unit_tests/forward_model_runner/test_job_dispatch.py index 58075884bb8..474ff102785 100644 --- a/tests/ert/unit_tests/forward_model_runner/test_job_dispatch.py +++ b/tests/ert/unit_tests/forward_model_runner/test_job_dispatch.py @@ -29,8 +29,8 @@ @pytest.mark.usefixtures("use_tmpdir") -def test_terminate_jobs(): - # Executes it self recursively and sleeps for 100 seconds +def test_terminate_steps(): + # Executes itself recursively and sleeps for 100 seconds with open("dummy_executable", "w", encoding="utf-8") as f: f.write( """#!/usr/bin/env python @@ -46,7 +46,7 @@ def test_terminate_jobs(): executable = os.path.realpath("dummy_executable") os.chmod("dummy_executable", stat.S_IRWXU | stat.S_IRWXO | stat.S_IRWXG) - job_list = { + step_list = { "global_environment": {}, "global_update_path": {}, "jobList": [ @@ -74,7 +74,7 @@ def test_terminate_jobs(): } with open(JOBS_FILE, "w", encoding="utf-8") as f: - f.write(json.dumps(job_list)) + f.write(json.dumps(step_list)) # macOS doesn't provide /usr/bin/setsid, so we roll our own with open("setsid", "w", encoding="utf-8") as f: @@ -166,7 +166,7 @@ def test_job_dispatch_run_subset_specified_as_parameter(): f.write( "#!/usr/bin/env python\n" "import sys, os\n" - 'filename = "job_{}.out".format(sys.argv[1])\n' + 'filename = "step_{}.out".format(sys.argv[1])\n' 'f = open(filename, "w", encoding="utf-8")\n' "f.close()\n" ) @@ -179,7 +179,7 @@ def test_job_dispatch_run_subset_specified_as_parameter(): "global_update_path": {}, "jobList": [ { - "name": "job_A", + "name": "step_A", "executable": executable, "target_file": None, "error_file": None, @@ -197,7 +197,7 @@ def test_job_dispatch_run_subset_specified_as_parameter(): "max_arg": None, }, { - "name": "job_B", + "name": "step_B", "executable": executable, "target_file": None, "error_file": None, @@ -215,7 +215,7 @@ def test_job_dispatch_run_subset_specified_as_parameter(): "max_arg": None, }, { - "name": "job_C", + "name": "step_C", "executable": executable, "target_file": None, "error_file": None, @@ -265,16 +265,16 @@ def test_job_dispatch_run_subset_specified_as_parameter(): sys.executable, job_dispatch_script, os.getcwd(), - "job_B", - "job_C", + "step_B", + "step_C", ] ) job_dispatch_process.wait() - assert not os.path.isfile("job_A.out") - assert os.path.isfile("job_B.out") - assert os.path.isfile("job_C.out") + assert not os.path.isfile("step_A.out") + assert os.path.isfile("step_B.out") + assert os.path.isfile("step_C.out") def test_no_jobs_json_file_raises_IOError(tmp_path): @@ -294,7 +294,7 @@ def test_missing_directory_exits(tmp_path): main(["script.py", str(tmp_path / "non_existent")]) -def test_retry_of_jobs_file_read(unused_tcp_port, tmp_path, monkeypatch, caplog): +def test_retry_of_jobs_json_file_read(unused_tcp_port, tmp_path, monkeypatch, caplog): lock = Lock() lock.acquire() monkeypatch.setattr(_ert.forward_model_runner.cli, "_wait_for_retry", lock.acquire)