diff --git a/src/ert/scheduler/lsf_driver.py b/src/ert/scheduler/lsf_driver.py index 469ec46c81c..63b6f9c6ee9 100644 --- a/src/ert/scheduler/lsf_driver.py +++ b/src/ert/scheduler/lsf_driver.py @@ -444,9 +444,10 @@ async def poll(self) -> None: stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE, ) - except FileNotFoundError as e: + except OSError as e: logger.error(str(e)) - return + await asyncio.sleep(self._poll_period) + continue stdout, stderr = await process.communicate() if process.returncode: @@ -583,12 +584,17 @@ async def _poll_once_by_bhist( if time.time() - self._bhist_cache_timestamp < self._bhist_required_cache_age: return {} - process = await asyncio.create_subprocess_exec( - self._bhist_cmd, - *[str(job_id) for job_id in missing_job_ids], - stdout=asyncio.subprocess.PIPE, - stderr=asyncio.subprocess.PIPE, - ) + try: + process = await asyncio.create_subprocess_exec( + str(self._bhist_cmd), + *[str(job_id) for job_id in missing_job_ids], + stdout=asyncio.subprocess.PIPE, + stderr=asyncio.subprocess.PIPE, + ) + except OSError as e: + logger.error(str(e)) + return {} + stdout, stderr = await process.communicate() if process.returncode: logger.error( diff --git a/src/ert/scheduler/openpbs_driver.py b/src/ert/scheduler/openpbs_driver.py index 0c4c0352017..3c964750a09 100644 --- a/src/ert/scheduler/openpbs_driver.py +++ b/src/ert/scheduler/openpbs_driver.py @@ -271,9 +271,10 @@ async def poll(self) -> None: stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE, ) - except FileNotFoundError as e: + except OSError as e: logger.error(str(e)) - return + await asyncio.sleep(self._poll_period) + continue stdout, stderr = await process.communicate() if process.returncode not in {0, QSTAT_UNKNOWN_JOB_ID}: # Any unknown job ids will yield QSTAT_UNKNOWN_JOB_ID, but @@ -296,14 +297,20 @@ async def poll(self) -> None: await self._process_job_update(job_id, job) if self._finished_job_ids: - process = await asyncio.create_subprocess_exec( - str(self._qstat_cmd), - "-Efx", - "-Fjson", - *self._finished_job_ids, - stdout=asyncio.subprocess.PIPE, - stderr=asyncio.subprocess.PIPE, - ) + try: + process = await asyncio.create_subprocess_exec( + str(self._qstat_cmd), + "-Efx", + "-Fjson", + *self._finished_job_ids, + stdout=asyncio.subprocess.PIPE, + stderr=asyncio.subprocess.PIPE, + ) + except OSError as e: + logger.error(str(e)) + await asyncio.sleep(self._poll_period) + continue + stdout, stderr = await process.communicate() if process.returncode not in {0, QSTAT_UNKNOWN_JOB_ID}: # Any unknown job ids will yield QSTAT_UNKNOWN_JOB_ID, but diff --git a/src/ert/scheduler/slurm_driver.py b/src/ert/scheduler/slurm_driver.py index 3d5a4435543..d0cdde3cdc2 100644 --- a/src/ert/scheduler/slurm_driver.py +++ b/src/ert/scheduler/slurm_driver.py @@ -251,9 +251,10 @@ async def poll(self) -> None: stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE, ) - except FileNotFoundError as e: + except OSError as e: logger.error(str(e)) - return + await asyncio.sleep(self._poll_period) + continue stdout, stderr = await process.communicate() if process.returncode: logger.warning( diff --git a/tests/ert/unit_tests/scheduler/test_generic_driver.py b/tests/ert/unit_tests/scheduler/test_generic_driver.py index 5e797e2827f..7deba26ff7b 100644 --- a/tests/ert/unit_tests/scheduler/test_generic_driver.py +++ b/tests/ert/unit_tests/scheduler/test_generic_driver.py @@ -230,7 +230,7 @@ async def test_execute_with_retry_exits_on_filenotfounderror(driver: Driver, cap @pytest.mark.integration_test -async def test_poll_exits_on_filenotfounderror(driver: Driver, caplog): +async def test_poll_hangs_on_filenotfounderror(driver: Driver, caplog): if isinstance(driver, LocalDriver): pytest.skip("LocalDriver does not poll") caplog.set_level(logging.DEBUG) @@ -240,7 +240,8 @@ async def test_poll_exits_on_filenotfounderror(driver: Driver, caplog): driver._squeue = invalid_cmd driver._jobs = {"foo": "bar"} driver._non_finished_job_ids = ["foo"] - await driver.poll() + with pytest.raises(asyncio.TimeoutError): + await asyncio.wait_for(driver.poll(), timeout=0.1) # We log a retry message every time we retry assert "retry" not in str(caplog.text) diff --git a/tests/ert/unit_tests/scheduler/test_lsf_driver.py b/tests/ert/unit_tests/scheduler/test_lsf_driver.py index 7064748a99b..ac2ae3e83f0 100644 --- a/tests/ert/unit_tests/scheduler/test_lsf_driver.py +++ b/tests/ert/unit_tests/scheduler/test_lsf_driver.py @@ -4,6 +4,7 @@ import os import random import re +import shutil import stat import string import time @@ -1269,6 +1270,62 @@ def mock_poll_once_by_bhist(*args, **kwargs): assert job_id in driver._bhist_cache +async def test_no_exception_when_bjobs_does_not_exist(caplog, job_name): + """The intent of this test is to ensure the driver will not + go down if the filesystem is temporarily flaky.""" + driver = LsfDriver(bjobs_cmd="/bin_foo/not_existing") + driver._poll_period = 0.01 + await driver.submit(0, "sh", "-c", "sleep 1", name=job_name) + with pytest.raises(asyncio.TimeoutError): + await asyncio.wait_for(driver.poll(), timeout=0.1) + assert "No such file or directory: '/bin_foo/not_existing'" in caplog.text + + +async def test_no_exception_when_no_access_to_bjobs_executable( + not_found_bjobs, caplog, job_name +): + """The intent of this test is to ensure the driver will not + go down if the filesystem is temporarily flaky.""" + driver = LsfDriver() + driver._poll_period = 0.01 + Path("bin/bjobs").chmod(0x0) # Modify the bjobs from the fixture + await driver.submit(0, "sh", "-c", "echo", name=job_name) + with pytest.raises(asyncio.TimeoutError): + await asyncio.wait_for(driver.poll(), timeout=0.1) + assert "Permission denied" in caplog.text + + +async def test_no_exception_when_bhist_fallback_to_bjobs_does_not_exist( + not_found_bjobs, caplog, job_name +): + """The intent of this test is to ensure the driver will not + go down if the filesystem is temporarily flaky.""" + driver = LsfDriver(bhist_cmd="/bin_foo/not_existing_bhist") + driver._bhist_required_cache_age = 0 + driver._poll_period = 0.01 + await driver.submit(0, "sh", "-c", "sleep 1", name=job_name) + with pytest.raises(asyncio.TimeoutError): + # driver.poll() should be in an infinite loop in this test + await asyncio.wait_for(driver.poll(), timeout=0.1) + assert "No such file or directory: '/bin_foo/not_existing_bhist'" in caplog.text + + +async def test_no_exception_when_no_permission_to_bhist( + not_found_bjobs, caplog, job_name +): + """The intent of this test is to ensure the driver will not + go down if the filesystem is temporarily flaky.""" + shutil.copy("bin/bjobs", "bin/bhist") # Reuse dummy script + Path("bin/bhist").chmod(0x0) # Make it non-executable + driver = LsfDriver(bhist_cmd="bin/bhist") + driver._bhist_required_cache_age = 0 + driver._poll_period = 0.01 + await driver.submit(0, "sh", "-c", "sleep 1", name=job_name) + with pytest.raises(asyncio.TimeoutError): + await asyncio.wait_for(driver.poll(), timeout=0.1) + assert "Permission denied: 'bin/bhist'" in caplog.text + + @pytest.mark.integration_test async def test_that_kill_before_submit_is_finished_works(tmp_path, monkeypatch, caplog): """This test asserts that it is possible to issue a kill command