From 78ef92c37bafb1689e264706e9fc404d422da3a1 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?H=C3=A5vard=20Berland?= Date: Thu, 9 Jan 2025 08:52:14 +0100 Subject: [PATCH] Ignore OSErrors on subprocess call of bjobs and bhist Pretend these kinds of issues are flaky. It is important not to crash on potentially intermittent failures in code that is rerun every 2 seconds. --- src/ert/scheduler/lsf_driver.py | 22 ++++--- src/ert/scheduler/openpbs_driver.py | 27 +++++---- src/ert/scheduler/slurm_driver.py | 5 +- .../scheduler/test_generic_driver.py | 5 +- .../unit_tests/scheduler/test_lsf_driver.py | 57 +++++++++++++++++++ 5 files changed, 94 insertions(+), 22 deletions(-) diff --git a/src/ert/scheduler/lsf_driver.py b/src/ert/scheduler/lsf_driver.py index 469ec46c81c..63b6f9c6ee9 100644 --- a/src/ert/scheduler/lsf_driver.py +++ b/src/ert/scheduler/lsf_driver.py @@ -444,9 +444,10 @@ async def poll(self) -> None: stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE, ) - except FileNotFoundError as e: + except OSError as e: logger.error(str(e)) - return + await asyncio.sleep(self._poll_period) + continue stdout, stderr = await process.communicate() if process.returncode: @@ -583,12 +584,17 @@ async def _poll_once_by_bhist( if time.time() - self._bhist_cache_timestamp < self._bhist_required_cache_age: return {} - process = await asyncio.create_subprocess_exec( - self._bhist_cmd, - *[str(job_id) for job_id in missing_job_ids], - stdout=asyncio.subprocess.PIPE, - stderr=asyncio.subprocess.PIPE, - ) + try: + process = await asyncio.create_subprocess_exec( + str(self._bhist_cmd), + *[str(job_id) for job_id in missing_job_ids], + stdout=asyncio.subprocess.PIPE, + stderr=asyncio.subprocess.PIPE, + ) + except OSError as e: + logger.error(str(e)) + return {} + stdout, stderr = await process.communicate() if process.returncode: logger.error( diff --git a/src/ert/scheduler/openpbs_driver.py b/src/ert/scheduler/openpbs_driver.py index 0c4c0352017..3c964750a09 100644 --- a/src/ert/scheduler/openpbs_driver.py +++ b/src/ert/scheduler/openpbs_driver.py @@ -271,9 +271,10 @@ async def poll(self) -> None: stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE, ) - except FileNotFoundError as e: + except OSError as e: logger.error(str(e)) - return + await asyncio.sleep(self._poll_period) + continue stdout, stderr = await process.communicate() if process.returncode not in {0, QSTAT_UNKNOWN_JOB_ID}: # Any unknown job ids will yield QSTAT_UNKNOWN_JOB_ID, but @@ -296,14 +297,20 @@ async def poll(self) -> None: await self._process_job_update(job_id, job) if self._finished_job_ids: - process = await asyncio.create_subprocess_exec( - str(self._qstat_cmd), - "-Efx", - "-Fjson", - *self._finished_job_ids, - stdout=asyncio.subprocess.PIPE, - stderr=asyncio.subprocess.PIPE, - ) + try: + process = await asyncio.create_subprocess_exec( + str(self._qstat_cmd), + "-Efx", + "-Fjson", + *self._finished_job_ids, + stdout=asyncio.subprocess.PIPE, + stderr=asyncio.subprocess.PIPE, + ) + except OSError as e: + logger.error(str(e)) + await asyncio.sleep(self._poll_period) + continue + stdout, stderr = await process.communicate() if process.returncode not in {0, QSTAT_UNKNOWN_JOB_ID}: # Any unknown job ids will yield QSTAT_UNKNOWN_JOB_ID, but diff --git a/src/ert/scheduler/slurm_driver.py b/src/ert/scheduler/slurm_driver.py index 3d5a4435543..d0cdde3cdc2 100644 --- a/src/ert/scheduler/slurm_driver.py +++ b/src/ert/scheduler/slurm_driver.py @@ -251,9 +251,10 @@ async def poll(self) -> None: stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE, ) - except FileNotFoundError as e: + except OSError as e: logger.error(str(e)) - return + await asyncio.sleep(self._poll_period) + continue stdout, stderr = await process.communicate() if process.returncode: logger.warning( diff --git a/tests/ert/unit_tests/scheduler/test_generic_driver.py b/tests/ert/unit_tests/scheduler/test_generic_driver.py index 5e797e2827f..7deba26ff7b 100644 --- a/tests/ert/unit_tests/scheduler/test_generic_driver.py +++ b/tests/ert/unit_tests/scheduler/test_generic_driver.py @@ -230,7 +230,7 @@ async def test_execute_with_retry_exits_on_filenotfounderror(driver: Driver, cap @pytest.mark.integration_test -async def test_poll_exits_on_filenotfounderror(driver: Driver, caplog): +async def test_poll_hangs_on_filenotfounderror(driver: Driver, caplog): if isinstance(driver, LocalDriver): pytest.skip("LocalDriver does not poll") caplog.set_level(logging.DEBUG) @@ -240,7 +240,8 @@ async def test_poll_exits_on_filenotfounderror(driver: Driver, caplog): driver._squeue = invalid_cmd driver._jobs = {"foo": "bar"} driver._non_finished_job_ids = ["foo"] - await driver.poll() + with pytest.raises(asyncio.TimeoutError): + await asyncio.wait_for(driver.poll(), timeout=0.1) # We log a retry message every time we retry assert "retry" not in str(caplog.text) diff --git a/tests/ert/unit_tests/scheduler/test_lsf_driver.py b/tests/ert/unit_tests/scheduler/test_lsf_driver.py index 7064748a99b..ac2ae3e83f0 100644 --- a/tests/ert/unit_tests/scheduler/test_lsf_driver.py +++ b/tests/ert/unit_tests/scheduler/test_lsf_driver.py @@ -4,6 +4,7 @@ import os import random import re +import shutil import stat import string import time @@ -1269,6 +1270,62 @@ def mock_poll_once_by_bhist(*args, **kwargs): assert job_id in driver._bhist_cache +async def test_no_exception_when_bjobs_does_not_exist(caplog, job_name): + """The intent of this test is to ensure the driver will not + go down if the filesystem is temporarily flaky.""" + driver = LsfDriver(bjobs_cmd="/bin_foo/not_existing") + driver._poll_period = 0.01 + await driver.submit(0, "sh", "-c", "sleep 1", name=job_name) + with pytest.raises(asyncio.TimeoutError): + await asyncio.wait_for(driver.poll(), timeout=0.1) + assert "No such file or directory: '/bin_foo/not_existing'" in caplog.text + + +async def test_no_exception_when_no_access_to_bjobs_executable( + not_found_bjobs, caplog, job_name +): + """The intent of this test is to ensure the driver will not + go down if the filesystem is temporarily flaky.""" + driver = LsfDriver() + driver._poll_period = 0.01 + Path("bin/bjobs").chmod(0x0) # Modify the bjobs from the fixture + await driver.submit(0, "sh", "-c", "echo", name=job_name) + with pytest.raises(asyncio.TimeoutError): + await asyncio.wait_for(driver.poll(), timeout=0.1) + assert "Permission denied" in caplog.text + + +async def test_no_exception_when_bhist_fallback_to_bjobs_does_not_exist( + not_found_bjobs, caplog, job_name +): + """The intent of this test is to ensure the driver will not + go down if the filesystem is temporarily flaky.""" + driver = LsfDriver(bhist_cmd="/bin_foo/not_existing_bhist") + driver._bhist_required_cache_age = 0 + driver._poll_period = 0.01 + await driver.submit(0, "sh", "-c", "sleep 1", name=job_name) + with pytest.raises(asyncio.TimeoutError): + # driver.poll() should be in an infinite loop in this test + await asyncio.wait_for(driver.poll(), timeout=0.1) + assert "No such file or directory: '/bin_foo/not_existing_bhist'" in caplog.text + + +async def test_no_exception_when_no_permission_to_bhist( + not_found_bjobs, caplog, job_name +): + """The intent of this test is to ensure the driver will not + go down if the filesystem is temporarily flaky.""" + shutil.copy("bin/bjobs", "bin/bhist") # Reuse dummy script + Path("bin/bhist").chmod(0x0) # Make it non-executable + driver = LsfDriver(bhist_cmd="bin/bhist") + driver._bhist_required_cache_age = 0 + driver._poll_period = 0.01 + await driver.submit(0, "sh", "-c", "sleep 1", name=job_name) + with pytest.raises(asyncio.TimeoutError): + await asyncio.wait_for(driver.poll(), timeout=0.1) + assert "Permission denied: 'bin/bhist'" in caplog.text + + @pytest.mark.integration_test async def test_that_kill_before_submit_is_finished_works(tmp_path, monkeypatch, caplog): """This test asserts that it is possible to issue a kill command