Skip to content

Commit

Permalink
Ensure that workflow runner raises exception on failing STOP_ON_FAIL job
Browse files Browse the repository at this point in the history
  • Loading branch information
Yngve S. Kristiansen committed Sep 19, 2023
1 parent a7b26a0 commit 549d19d
Show file tree
Hide file tree
Showing 4 changed files with 97 additions and 2 deletions.
4 changes: 3 additions & 1 deletion src/ert/config/workflow_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,9 @@ def from_file(cls, config_file: str, name: Optional[str] = None) -> "WorkflowJob
script=str(content_dict.get("SCRIPT")) # type: ignore
if "SCRIPT" in content_dict
else None,
stop_on_fail=content_dict.get("STOP_ON_FAIL", False),
stop_on_fail=content_dict.get("STOP_ON_FAIL")

Check failure on line 93 in src/ert/config/workflow_job.py

View workflow job for this annotation

GitHub Actions / type-checking (3.11)

No overload variant of "get" of "dict" matches argument type "str"
if "STOP_ON_FAIL" in content_dict
else False,
)

def is_plugin(self) -> bool:
Expand Down
16 changes: 15 additions & 1 deletion src/ert/job_queue/workflow_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -141,8 +141,14 @@ def __exit__(
def run(self) -> None:
if self.isRunning():
raise AssertionError("An instance of workflow is already running!")

self._workflow_job = self._workflow_executor.submit(self.run_blocking)

def on_done(future):

Check failure on line 147 in src/ert/job_queue/workflow_runner.py

View workflow job for this annotation

GitHub Actions / type-checking (3.11)

Function is missing a type annotation
print("2")

self._workflow_job.add_done_callback(on_done)

def run_blocking(self) -> None:
self.__workflow_result = None
logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -173,7 +179,15 @@ def run_blocking(self) -> None:
}

if jobrunner.hasFailed():
logger.error(f"Workflow job {jobrunner.name} failed", extra=info)
if jobrunner.job.stop_on_fail:
raise RuntimeError(
f"Workflow job {info['job_name']}"
f" failed with error: {info['stderr']}"
)
else:
logger.error(
f"Workflow job {jobrunner.name} failed", extra=info
)
else:
logger.info(
f"Workflow job {jobrunner.name} completed successfully",

Check failure on line 193 in src/ert/job_queue/workflow_runner.py

View workflow job for this annotation

GitHub Actions / annotate-python-linting

line too long (80 > 79 characters)
Expand Down
42 changes: 42 additions & 0 deletions tests/unit_tests/cli/test_integration_cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -895,3 +895,45 @@ def test_that_setenv_sets_environment_variables_in_jobs(setenv_config):
assert lines[2].strip() == "TheThirdValue"
# now MYVAR now set, so should be expanded inside the value of FOURTH
assert lines[3].strip() == "fourth:foo"


@pytest.mark.usefixtures("use_tmpdir", "copy_poly_case")
def test_that_stop_on_fail_workflow_jobs_stop_ert():
with open("failing_job", "w", encoding="utf-8") as f:
f.write(
dedent(
"""
STOP_ON_FAIL True
INTERNAL False
EXECUTABLE shitty_script.sh
"""
)
)

with open("shitty_script.sh", "w", encoding="utf-8") as s:
s.write(
"""
#!/bin/bash
ekho helo wordl
"""
)

os.chmod("shitty_script.sh", os.stat("shitty_script.sh").st_mode | 0o111)

with open("dump_failing_workflow", "w", encoding="utf-8") as f:
f.write("failjob")

with open("poly.ert", mode="a", encoding="utf-8") as fh:
fh.write(
dedent(
"""
LOAD_WORKFLOW_JOB failing_job failjob
LOAD_WORKFLOW dump_failing_workflow wffail
HOOK_WORKFLOW wffail POST_SIMULATION
"""
)
)

parsed = ert_parser(None, args=[ENSEMBLE_EXPERIMENT_MODE, "poly.ert"])
with pytest.raises(ErtCliError, match="Workflow job failjob failed with error"):
run_cli(parsed)
37 changes: 37 additions & 0 deletions tests/unit_tests/job_queue/test_workflow_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -137,3 +137,40 @@ def test_workflow_success():
assert os.path.exists("wait_finished_1")

assert workflow_runner.workflowResult()


@pytest.mark.usefixtures("use_tmpdir")
def test_workflow_stops_with_stopping_job():
WorkflowCommon.createExternalDumpJob()
with open("dump_failing_job", "a", encoding="utf-8") as f:
f.write("STOP_ON_FAIL True")

with open("dump_failing_workflow", "w", encoding="utf-8") as f:
f.write("DUMP")

job_failing_dump = WorkflowJob.from_file("dump_failing_job")
assert job_failing_dump.stop_on_fail

workflow = Workflow.from_file(
src_file="dump_failing_workflow",
context=SubstitutionList(),
job_dict={"DUMP": job_failing_dump},
)

runner = WorkflowRunner(workflow)
with pytest.raises(RuntimeError, match="Workflow job dump_failing_job failed"):
runner.run_blocking()

with open("dump_failing_job", "a", encoding="utf-8") as f:
f.write("\nSTOP_ON_FAIL False")

job_successful_dump = WorkflowJob.from_file("dump_failing_job")
assert not job_successful_dump.stop_on_fail
workflow = Workflow.from_file(
src_file="dump_failing_workflow",
context=SubstitutionList(),
job_dict={"DUMP": job_successful_dump},
)

# Expect no error raised
WorkflowRunner(workflow).run_blocking()

0 comments on commit 549d19d

Please sign in to comment.