From 549d19dd5852d8e57571695c65f73e67a8e9f914 Mon Sep 17 00:00:00 2001 From: "Yngve S. Kristiansen" Date: Mon, 18 Sep 2023 15:04:11 +0200 Subject: [PATCH] Ensure that workflow runner raises exception on failing STOP_ON_FAIL job --- src/ert/config/workflow_job.py | 4 +- src/ert/job_queue/workflow_runner.py | 16 ++++++- tests/unit_tests/cli/test_integration_cli.py | 42 +++++++++++++++++++ .../job_queue/test_workflow_runner.py | 37 ++++++++++++++++ 4 files changed, 97 insertions(+), 2 deletions(-) diff --git a/src/ert/config/workflow_job.py b/src/ert/config/workflow_job.py index 29f310ed05a..00b40cc72ae 100644 --- a/src/ert/config/workflow_job.py +++ b/src/ert/config/workflow_job.py @@ -90,7 +90,9 @@ def from_file(cls, config_file: str, name: Optional[str] = None) -> "WorkflowJob script=str(content_dict.get("SCRIPT")) # type: ignore if "SCRIPT" in content_dict else None, - stop_on_fail=content_dict.get("STOP_ON_FAIL", False), + stop_on_fail=content_dict.get("STOP_ON_FAIL") + if "STOP_ON_FAIL" in content_dict + else False, ) def is_plugin(self) -> bool: diff --git a/src/ert/job_queue/workflow_runner.py b/src/ert/job_queue/workflow_runner.py index 030910a4396..31a3967e93c 100644 --- a/src/ert/job_queue/workflow_runner.py +++ b/src/ert/job_queue/workflow_runner.py @@ -141,8 +141,14 @@ def __exit__( def run(self) -> None: if self.isRunning(): raise AssertionError("An instance of workflow is already running!") + self._workflow_job = self._workflow_executor.submit(self.run_blocking) + def on_done(future): + print("2") + + self._workflow_job.add_done_callback(on_done) + def run_blocking(self) -> None: self.__workflow_result = None logger = logging.getLogger(__name__) @@ -173,7 +179,15 @@ def run_blocking(self) -> None: } if jobrunner.hasFailed(): - logger.error(f"Workflow job {jobrunner.name} failed", extra=info) + if jobrunner.job.stop_on_fail: + raise RuntimeError( + f"Workflow job {info['job_name']}" + f" failed with error: {info['stderr']}" + ) + else: + logger.error( + f"Workflow job {jobrunner.name} failed", extra=info + ) else: logger.info( f"Workflow job {jobrunner.name} completed successfully", diff --git a/tests/unit_tests/cli/test_integration_cli.py b/tests/unit_tests/cli/test_integration_cli.py index bdb63d6647d..82fb8171254 100644 --- a/tests/unit_tests/cli/test_integration_cli.py +++ b/tests/unit_tests/cli/test_integration_cli.py @@ -895,3 +895,45 @@ def test_that_setenv_sets_environment_variables_in_jobs(setenv_config): assert lines[2].strip() == "TheThirdValue" # now MYVAR now set, so should be expanded inside the value of FOURTH assert lines[3].strip() == "fourth:foo" + + +@pytest.mark.usefixtures("use_tmpdir", "copy_poly_case") +def test_that_stop_on_fail_workflow_jobs_stop_ert(): + with open("failing_job", "w", encoding="utf-8") as f: + f.write( + dedent( + """ + STOP_ON_FAIL True + INTERNAL False + EXECUTABLE shitty_script.sh + """ + ) + ) + + with open("shitty_script.sh", "w", encoding="utf-8") as s: + s.write( + """ + #!/bin/bash + ekho helo wordl + """ + ) + + os.chmod("shitty_script.sh", os.stat("shitty_script.sh").st_mode | 0o111) + + with open("dump_failing_workflow", "w", encoding="utf-8") as f: + f.write("failjob") + + with open("poly.ert", mode="a", encoding="utf-8") as fh: + fh.write( + dedent( + """ + LOAD_WORKFLOW_JOB failing_job failjob + LOAD_WORKFLOW dump_failing_workflow wffail + HOOK_WORKFLOW wffail POST_SIMULATION + """ + ) + ) + + parsed = ert_parser(None, args=[ENSEMBLE_EXPERIMENT_MODE, "poly.ert"]) + with pytest.raises(ErtCliError, match="Workflow job failjob failed with error"): + run_cli(parsed) diff --git a/tests/unit_tests/job_queue/test_workflow_runner.py b/tests/unit_tests/job_queue/test_workflow_runner.py index fff6d5e1adc..ae228d9d8be 100644 --- a/tests/unit_tests/job_queue/test_workflow_runner.py +++ b/tests/unit_tests/job_queue/test_workflow_runner.py @@ -137,3 +137,40 @@ def test_workflow_success(): assert os.path.exists("wait_finished_1") assert workflow_runner.workflowResult() + + +@pytest.mark.usefixtures("use_tmpdir") +def test_workflow_stops_with_stopping_job(): + WorkflowCommon.createExternalDumpJob() + with open("dump_failing_job", "a", encoding="utf-8") as f: + f.write("STOP_ON_FAIL True") + + with open("dump_failing_workflow", "w", encoding="utf-8") as f: + f.write("DUMP") + + job_failing_dump = WorkflowJob.from_file("dump_failing_job") + assert job_failing_dump.stop_on_fail + + workflow = Workflow.from_file( + src_file="dump_failing_workflow", + context=SubstitutionList(), + job_dict={"DUMP": job_failing_dump}, + ) + + runner = WorkflowRunner(workflow) + with pytest.raises(RuntimeError, match="Workflow job dump_failing_job failed"): + runner.run_blocking() + + with open("dump_failing_job", "a", encoding="utf-8") as f: + f.write("\nSTOP_ON_FAIL False") + + job_successful_dump = WorkflowJob.from_file("dump_failing_job") + assert not job_successful_dump.stop_on_fail + workflow = Workflow.from_file( + src_file="dump_failing_workflow", + context=SubstitutionList(), + job_dict={"DUMP": job_successful_dump}, + ) + + # Expect no error raised + WorkflowRunner(workflow).run_blocking()