Skip to content

Commit

Permalink
Ignore OSErrors on subprocess call of bjobs and bhist
Browse files Browse the repository at this point in the history
Pretend these kinds of issues are flaky. It is important not to crash on
potentially intermittent failures in code that is rerun every 2 seconds.
  • Loading branch information
berland committed Jan 9, 2025
1 parent c4217b1 commit 78ef92c
Show file tree
Hide file tree
Showing 5 changed files with 94 additions and 22 deletions.
22 changes: 14 additions & 8 deletions src/ert/scheduler/lsf_driver.py
Original file line number Diff line number Diff line change
Expand Up @@ -444,9 +444,10 @@ async def poll(self) -> None:
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
)
except FileNotFoundError as e:
except OSError as e:
logger.error(str(e))
return
await asyncio.sleep(self._poll_period)
continue

stdout, stderr = await process.communicate()
if process.returncode:
Expand Down Expand Up @@ -583,12 +584,17 @@ async def _poll_once_by_bhist(
if time.time() - self._bhist_cache_timestamp < self._bhist_required_cache_age:
return {}

process = await asyncio.create_subprocess_exec(
self._bhist_cmd,
*[str(job_id) for job_id in missing_job_ids],
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
)
try:
process = await asyncio.create_subprocess_exec(
str(self._bhist_cmd),
*[str(job_id) for job_id in missing_job_ids],
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
)
except OSError as e:
logger.error(str(e))
return {}

stdout, stderr = await process.communicate()
if process.returncode:
logger.error(
Expand Down
27 changes: 17 additions & 10 deletions src/ert/scheduler/openpbs_driver.py
Original file line number Diff line number Diff line change
Expand Up @@ -271,9 +271,10 @@ async def poll(self) -> None:
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
)
except FileNotFoundError as e:
except OSError as e:
logger.error(str(e))
return
await asyncio.sleep(self._poll_period)
continue
stdout, stderr = await process.communicate()
if process.returncode not in {0, QSTAT_UNKNOWN_JOB_ID}:
# Any unknown job ids will yield QSTAT_UNKNOWN_JOB_ID, but
Expand All @@ -296,14 +297,20 @@ async def poll(self) -> None:
await self._process_job_update(job_id, job)

if self._finished_job_ids:
process = await asyncio.create_subprocess_exec(
str(self._qstat_cmd),
"-Efx",
"-Fjson",
*self._finished_job_ids,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
)
try:
process = await asyncio.create_subprocess_exec(
str(self._qstat_cmd),
"-Efx",
"-Fjson",
*self._finished_job_ids,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
)
except OSError as e:
logger.error(str(e))
await asyncio.sleep(self._poll_period)
continue

stdout, stderr = await process.communicate()
if process.returncode not in {0, QSTAT_UNKNOWN_JOB_ID}:
# Any unknown job ids will yield QSTAT_UNKNOWN_JOB_ID, but
Expand Down
5 changes: 3 additions & 2 deletions src/ert/scheduler/slurm_driver.py
Original file line number Diff line number Diff line change
Expand Up @@ -251,9 +251,10 @@ async def poll(self) -> None:
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
)
except FileNotFoundError as e:
except OSError as e:
logger.error(str(e))
return
await asyncio.sleep(self._poll_period)
continue
stdout, stderr = await process.communicate()
if process.returncode:
logger.warning(
Expand Down
5 changes: 3 additions & 2 deletions tests/ert/unit_tests/scheduler/test_generic_driver.py
Original file line number Diff line number Diff line change
Expand Up @@ -230,7 +230,7 @@ async def test_execute_with_retry_exits_on_filenotfounderror(driver: Driver, cap


@pytest.mark.integration_test
async def test_poll_exits_on_filenotfounderror(driver: Driver, caplog):
async def test_poll_hangs_on_filenotfounderror(driver: Driver, caplog):
if isinstance(driver, LocalDriver):
pytest.skip("LocalDriver does not poll")
caplog.set_level(logging.DEBUG)
Expand All @@ -240,7 +240,8 @@ async def test_poll_exits_on_filenotfounderror(driver: Driver, caplog):
driver._squeue = invalid_cmd
driver._jobs = {"foo": "bar"}
driver._non_finished_job_ids = ["foo"]
await driver.poll()
with pytest.raises(asyncio.TimeoutError):
await asyncio.wait_for(driver.poll(), timeout=0.1)

# We log a retry message every time we retry
assert "retry" not in str(caplog.text)
Expand Down
57 changes: 57 additions & 0 deletions tests/ert/unit_tests/scheduler/test_lsf_driver.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import os
import random
import re
import shutil
import stat
import string
import time
Expand Down Expand Up @@ -1269,6 +1270,62 @@ def mock_poll_once_by_bhist(*args, **kwargs):
assert job_id in driver._bhist_cache


async def test_no_exception_when_bjobs_does_not_exist(caplog, job_name):
"""The intent of this test is to ensure the driver will not
go down if the filesystem is temporarily flaky."""
driver = LsfDriver(bjobs_cmd="/bin_foo/not_existing")
driver._poll_period = 0.01
await driver.submit(0, "sh", "-c", "sleep 1", name=job_name)
with pytest.raises(asyncio.TimeoutError):
await asyncio.wait_for(driver.poll(), timeout=0.1)
assert "No such file or directory: '/bin_foo/not_existing'" in caplog.text


async def test_no_exception_when_no_access_to_bjobs_executable(
not_found_bjobs, caplog, job_name
):
"""The intent of this test is to ensure the driver will not
go down if the filesystem is temporarily flaky."""
driver = LsfDriver()
driver._poll_period = 0.01
Path("bin/bjobs").chmod(0x0) # Modify the bjobs from the fixture
await driver.submit(0, "sh", "-c", "echo", name=job_name)
with pytest.raises(asyncio.TimeoutError):
await asyncio.wait_for(driver.poll(), timeout=0.1)
assert "Permission denied" in caplog.text


async def test_no_exception_when_bhist_fallback_to_bjobs_does_not_exist(
not_found_bjobs, caplog, job_name
):
"""The intent of this test is to ensure the driver will not
go down if the filesystem is temporarily flaky."""
driver = LsfDriver(bhist_cmd="/bin_foo/not_existing_bhist")
driver._bhist_required_cache_age = 0
driver._poll_period = 0.01
await driver.submit(0, "sh", "-c", "sleep 1", name=job_name)
with pytest.raises(asyncio.TimeoutError):
# driver.poll() should be in an infinite loop in this test
await asyncio.wait_for(driver.poll(), timeout=0.1)
assert "No such file or directory: '/bin_foo/not_existing_bhist'" in caplog.text


async def test_no_exception_when_no_permission_to_bhist(
not_found_bjobs, caplog, job_name
):
"""The intent of this test is to ensure the driver will not
go down if the filesystem is temporarily flaky."""
shutil.copy("bin/bjobs", "bin/bhist") # Reuse dummy script
Path("bin/bhist").chmod(0x0) # Make it non-executable
driver = LsfDriver(bhist_cmd="bin/bhist")
driver._bhist_required_cache_age = 0
driver._poll_period = 0.01
await driver.submit(0, "sh", "-c", "sleep 1", name=job_name)
with pytest.raises(asyncio.TimeoutError):
await asyncio.wait_for(driver.poll(), timeout=0.1)
assert "Permission denied: 'bin/bhist'" in caplog.text


@pytest.mark.integration_test
async def test_that_kill_before_submit_is_finished_works(tmp_path, monkeypatch, caplog):
"""This test asserts that it is possible to issue a kill command
Expand Down

0 comments on commit 78ef92c

Please sign in to comment.