diff --git a/src/ert/scheduler/driver.py b/src/ert/scheduler/driver.py index 4d2f8f5b4e1..f61719e8339 100644 --- a/src/ert/scheduler/driver.py +++ b/src/ert/scheduler/driver.py @@ -83,7 +83,8 @@ async def _execute_with_retry( total_attempts: int = 1, retry_interval: float = 1.0, driverlogger: Optional[logging.Logger] = None, - exit_on_msgs: Iterable[str] = (), + return_on_msgs: Iterable[str] = (), + error_on_msgs: Iterable[str] = (), log_to_debug: Optional[bool] = True, ) -> Tuple[bool, str]: _logger = driverlogger or logging.getLogger(__name__) @@ -117,11 +118,16 @@ async def _execute_with_retry( f'Command "{shlex.join(cmd_with_args)}" succeeded with {outputs}' ) return True, stdout.decode(errors="ignore").strip() - elif exit_on_msgs and any( - exit_on_msg in stderr.decode(errors="ignore") - for exit_on_msg in exit_on_msgs + elif return_on_msgs and any( + return_on_msg in stderr.decode(errors="ignore") + for return_on_msg in return_on_msgs ): return True, stderr.decode(errors="ignore").strip() + elif error_on_msgs and any( + error_on_msg in stderr.decode(errors="ignore") + for error_on_msg in error_on_msgs + ): + return False, stderr.decode(errors="ignore").strip() elif process.returncode in retry_codes: error_message = outputs elif process.returncode in accept_codes: diff --git a/src/ert/scheduler/lsf_driver.py b/src/ert/scheduler/lsf_driver.py index ca0c0cb344f..d2abcd1618a 100644 --- a/src/ert/scheduler/lsf_driver.py +++ b/src/ert/scheduler/lsf_driver.py @@ -94,6 +94,7 @@ class RunningJob: LSF_INFO_JSON_FILENAME = "lsf_info.json" FLAKY_SSH_RETURNCODE = 255 JOB_ALREADY_FINISHED_BKILL_MSG = "Job has already finished" +BSUB_FAILURE_MESSAGES = ("Job not submitted",) def _parse_jobs_dict(jobs: Mapping[str, JobState]) -> dict[str, AnyJob]: @@ -334,6 +335,7 @@ async def submit( retry_codes=(FLAKY_SSH_RETURNCODE,), total_attempts=self._bsub_retries, retry_interval=self._sleep_time_between_cmd_retries, + error_on_msgs=BSUB_FAILURE_MESSAGES, ) if not process_success: self._job_error_message_by_iens[iens] = process_message @@ -386,7 +388,7 @@ async def kill(self, iens: int) -> None: retry_codes=(FLAKY_SSH_RETURNCODE,), total_attempts=3, retry_interval=self._sleep_time_between_cmd_retries, - exit_on_msgs=(JOB_ALREADY_FINISHED_BKILL_MSG), + return_on_msgs=(JOB_ALREADY_FINISHED_BKILL_MSG), ) await asyncio.create_subprocess_shell( f"sleep {self._sleep_time_between_bkills}; {self._bkill_cmd} -s SIGKILL {job_id}", diff --git a/tests/unit_tests/scheduler/test_lsf_driver.py b/tests/unit_tests/scheduler/test_lsf_driver.py index aa8c3310d6b..4a1a85ce0eb 100644 --- a/tests/unit_tests/scheduler/test_lsf_driver.py +++ b/tests/unit_tests/scheduler/test_lsf_driver.py @@ -568,6 +568,45 @@ async def test_that_bsub_will_retry_and_fail( await driver.submit(0, "sleep 10") +@pytest.mark.parametrize( + ("exit_code, error_msg"), + [ + # All these have been manually obtained on the command line by perturbing the command arguments to bsub: + (255, "No such queue. Job not submitted"), + (255, "Too many processors requested. Job not submitted."), + (255, 'Error near "select" : duplicate section. Job not submitted.'), + ( + 255, + "Error in select section: Expected number, string, " + 'name, or "(" but found end of section. Job not submitted.', + ), + ( + 255, + "Error with :" + " '&' cannot be used in the resource requirement section. Job not submitted.", + ), + (255, "Error in rusage section. Job not submitted."), + (255, "Job not submitted."), + ], +) +async def test_that_bsub_will_fail_without_retries( + monkeypatch, tmp_path, exit_code, error_msg +): + monkeypatch.chdir(tmp_path) + bin_path = Path("bin") + bin_path.mkdir() + monkeypatch.setenv("PATH", f"{bin_path}:{os.environ['PATH']}") + bsub_path = bin_path / "bsub" + bsub_path.write_text( + f'#!/bin/sh\necho . >> bsubcalls\necho "{error_msg}" >&2\nexit {exit_code}' + ) + bsub_path.chmod(bsub_path.stat().st_mode | stat.S_IEXEC) + driver = LsfDriver() + with pytest.raises(RuntimeError): + await driver.submit(0, "sleep 10") + assert len(Path("bsubcalls").read_text(encoding="utf-8").strip()) == 1 + + @pytest.mark.parametrize( ("exit_code, error_msg"), [