Skip to content

Commit

Permalink
Rename job to step in runner
Browse files Browse the repository at this point in the history
(many more of these to go)
  • Loading branch information
berland committed Oct 29, 2024
1 parent 7190b6c commit 97478fa
Show file tree
Hide file tree
Showing 4 changed files with 45 additions and 42 deletions.
53 changes: 28 additions & 25 deletions src/_ert/forward_model_runner/runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,28 +2,29 @@
import json
import os
from pathlib import Path
from typing import List
from typing import Any, Dict, List

from _ert.forward_model_runner.forward_model_step import ForwardModelStep
from _ert.forward_model_runner.reporting.message import Checksum, Finish, Init


class ForwardModelRunner:
def __init__(self, jobs_data):
self.jobs_data = jobs_data
self.simulation_id = jobs_data.get("run_id")
self.experiment_id = jobs_data.get("experiment_id")
self.ens_id = jobs_data.get("ens_id")
self.real_id = jobs_data.get("real_id")
self.ert_pid = jobs_data.get("ert_pid")
self.global_environment = jobs_data.get("global_environment")
job_data_list = jobs_data["jobList"]
def __init__(self, steps_data: Dict[str, Any]):
self.steps_data = (
steps_data # On disk, this is called jobs.json for legacy reasons
)
self.simulation_id = steps_data.get("run_id")
self.experiment_id = steps_data.get("experiment_id")
self.ens_id = steps_data.get("ens_id")
self.real_id = steps_data.get("real_id")
self.ert_pid = steps_data.get("ert_pid")
self.global_environment = steps_data.get("global_environment")
if self.simulation_id is not None:
os.environ["ERT_RUN_ID"] = self.simulation_id

self.jobs: List[ForwardModelStep] = []
for index, job_data in enumerate(job_data_list):
self.jobs.append(ForwardModelStep(job_data, index))
self.steps: List[ForwardModelStep] = []
for index, step_data in enumerate(steps_data["jobList"]):
self.steps.append(ForwardModelStep(step_data, index))

self._set_environment()

Expand All @@ -48,39 +49,41 @@ def _populate_checksums(self, manifest):
info["error"] = f"Expected file {path} not created by forward model!"
return manifest

def run(self, names_of_jobs_to_run):
# if names_of_jobs_to_run, create job_queue which contains jobs that
# are to be run.
if not names_of_jobs_to_run:
job_queue = self.jobs
def run(self, names_of_steps_to_run: List[str]):
if not names_of_steps_to_run:
step_queue = self.steps
else:
job_queue = [j for j in self.jobs if j.name() in names_of_jobs_to_run]
step_queue = [
step for step in self.steps if step.name() in names_of_steps_to_run
]
init_message = Init(
job_queue,
step_queue,
self.simulation_id,
self.ert_pid,
self.ens_id,
self.real_id,
self.experiment_id,
)

unused = set(names_of_jobs_to_run) - {j.name() for j in job_queue}
unused = set(names_of_steps_to_run) - {step.name() for step in step_queue}
if unused:
init_message.with_error(
f"{unused} does not exist. "
f"Available jobs: {[j.name() for j in self.jobs]}"
f"Available forward_model steps: {[step.name() for step in self.steps]}"
)
yield init_message
return
else:
yield init_message

for job in job_queue:
for status_update in job.run():
for step in step_queue:
for status_update in step.run():
yield status_update
if not status_update.success():
yield Checksum(checksum_dict={}, run_path=os.getcwd())
yield Finish().with_error("Not all jobs completed successfully.")
yield Finish().with_error(
"Not all forward model steps completed successfully."
)
return

checksum_dict = self._populate_checksums(self._read_manifest())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@ def test_report_with_failed_exit_message_argument(reporter):
assert "EXIT: 1/massive_failure" in f.readline()
with open(ERROR_file, "r", encoding="utf-8") as f:
content = "".join(f.readlines())
assert "<job>fmstep1</job>" in content, "ERROR file missing job"
assert "<job>fmstep1</job>" in content, "ERROR file missing fmstep"
assert (
"<reason>massive_failure</reason>" in content
), "ERROR file missing reason"
Expand Down Expand Up @@ -199,7 +199,7 @@ def test_old_file_deletion(reporter):

@pytest.mark.usefixtures("use_tmpdir")
def test_status_file_is_correct(reporter):
"""The STATUS file is a file to which we append data about jobs as they
"""The STATUS file is a file to which we append data about steps as they
are run. So this involves multiple reports, and should be tested as
such.
See https://github.com/equinor/libres/issues/764
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -250,7 +250,7 @@ def test_exec_env():


@pytest.mark.usefixtures("use_tmpdir")
def test_env_var_available_inside_job_context():
def test_env_var_available_inside_step_context():
with open("run_me.py", "w", encoding="utf-8") as f:
f.write(
"""#!/usr/bin/env python\n
Expand Down
28 changes: 14 additions & 14 deletions tests/ert/unit_tests/forward_model_runner/test_job_dispatch.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,8 @@


@pytest.mark.usefixtures("use_tmpdir")
def test_terminate_jobs():
# Executes it self recursively and sleeps for 100 seconds
def test_terminate_steps():
# Executes itself recursively and sleeps for 100 seconds
with open("dummy_executable", "w", encoding="utf-8") as f:
f.write(
"""#!/usr/bin/env python
Expand All @@ -46,7 +46,7 @@ def test_terminate_jobs():
executable = os.path.realpath("dummy_executable")
os.chmod("dummy_executable", stat.S_IRWXU | stat.S_IRWXO | stat.S_IRWXG)

job_list = {
step_list = {
"global_environment": {},
"global_update_path": {},
"jobList": [
Expand Down Expand Up @@ -74,7 +74,7 @@ def test_terminate_jobs():
}

with open(JOBS_FILE, "w", encoding="utf-8") as f:
f.write(json.dumps(job_list))
f.write(json.dumps(step_list))

# macOS doesn't provide /usr/bin/setsid, so we roll our own
with open("setsid", "w", encoding="utf-8") as f:
Expand Down Expand Up @@ -166,7 +166,7 @@ def test_job_dispatch_run_subset_specified_as_parameter():
f.write(
"#!/usr/bin/env python\n"
"import sys, os\n"
'filename = "job_{}.out".format(sys.argv[1])\n'
'filename = "step_{}.out".format(sys.argv[1])\n'
'f = open(filename, "w", encoding="utf-8")\n'
"f.close()\n"
)
Expand All @@ -179,7 +179,7 @@ def test_job_dispatch_run_subset_specified_as_parameter():
"global_update_path": {},
"jobList": [
{
"name": "job_A",
"name": "step_A",
"executable": executable,
"target_file": None,
"error_file": None,
Expand All @@ -197,7 +197,7 @@ def test_job_dispatch_run_subset_specified_as_parameter():
"max_arg": None,
},
{
"name": "job_B",
"name": "step_B",
"executable": executable,
"target_file": None,
"error_file": None,
Expand All @@ -215,7 +215,7 @@ def test_job_dispatch_run_subset_specified_as_parameter():
"max_arg": None,
},
{
"name": "job_C",
"name": "step_C",
"executable": executable,
"target_file": None,
"error_file": None,
Expand Down Expand Up @@ -265,16 +265,16 @@ def test_job_dispatch_run_subset_specified_as_parameter():
sys.executable,
job_dispatch_script,
os.getcwd(),
"job_B",
"job_C",
"step_B",
"step_C",
]
)

job_dispatch_process.wait()

assert not os.path.isfile("job_A.out")
assert os.path.isfile("job_B.out")
assert os.path.isfile("job_C.out")
assert not os.path.isfile("step_A.out")
assert os.path.isfile("step_B.out")
assert os.path.isfile("step_C.out")


def test_no_jobs_json_file_raises_IOError(tmp_path):
Expand All @@ -294,7 +294,7 @@ def test_missing_directory_exits(tmp_path):
main(["script.py", str(tmp_path / "non_existent")])


def test_retry_of_jobs_file_read(unused_tcp_port, tmp_path, monkeypatch, caplog):
def test_retry_of_jobs_json_file_read(unused_tcp_port, tmp_path, monkeypatch, caplog):
lock = Lock()
lock.acquire()
monkeypatch.setattr(_ert.forward_model_runner.cli, "_wait_for_retry", lock.acquire)
Expand Down

0 comments on commit 97478fa

Please sign in to comment.