Skip to content

Commit

Permalink
Log LSF execution host to Azure
Browse files Browse the repository at this point in the history
  • Loading branch information
andreas-el authored Oct 2, 2024
1 parent 5d4205d commit 398ce6a
Show file tree
Hide file tree
Showing 3 changed files with 81 additions and 16 deletions.
29 changes: 25 additions & 4 deletions src/ert/scheduler/lsf_driver.py
Original file line number Diff line number Diff line change
Expand Up @@ -109,14 +109,15 @@ class JobData:
iens: int
job_state: AnyJob
submitted_timestamp: float
exec_hosts: str = "-"


def parse_bjobs(bjobs_output: str) -> Dict[str, JobState]:
data: Dict[str, JobState] = {}
for line in bjobs_output.splitlines():
tokens = line.split(sep="^")
if len(tokens) == 2:
job_id, job_state = tokens
if len(tokens) == 3:
job_id, job_state, _ = tokens
if job_state not in get_args(JobState):
logger.error(
f"Unknown state {job_state} obtained from "
Expand All @@ -127,6 +128,16 @@ def parse_bjobs(bjobs_output: str) -> Dict[str, JobState]:
return data


def parse_bjobs_exec_hosts(bjobs_output: str) -> Dict[str, str]:
data: Dict[str, str] = {}
for line in bjobs_output.splitlines():
tokens = line.split(sep="^")
if len(tokens) == 3:
job_id, _, exec_hosts = tokens
data[job_id] = exec_hosts
return data


def build_resource_requirement_string(
exclude_hosts: Sequence[str],
realization_memory: int,
Expand Down Expand Up @@ -423,7 +434,7 @@ async def poll(self) -> None:
str(self._bjobs_cmd),
"-noheader",
"-o",
"jobid stat delimiter='^'",
"jobid stat exec_host delimiter='^'",
*current_jobids,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
Expand All @@ -440,6 +451,9 @@ async def poll(self) -> None:
f"bjobs gave returncode {process.returncode} and error {stderr.decode()}"
)
bjobs_states = _parse_jobs_dict(parse_bjobs(stdout.decode(errors="ignore")))
self.update_and_log_exec_hosts(
parse_bjobs_exec_hosts(stdout.decode(errors="ignore"))
)

job_ids_found_in_bjobs_output = set(bjobs_states.keys())
if (
Expand Down Expand Up @@ -491,7 +505,6 @@ async def _process_job_update(self, job_id: str, new_state: AnyJob) -> None:
logger.info(f"Realization {iens} (LSF-id: {self._iens2jobid[iens]}) failed")
exit_code = await self._get_exit_code(job_id)
event = FinishedEvent(iens=iens, returncode=exit_code)

elif isinstance(new_state, FinishedJobSuccess):
logger.info(
f"Realization {iens} (LSF-id: {self._iens2jobid[iens]}) succeeded"
Expand Down Expand Up @@ -606,6 +619,14 @@ async def _poll_once_by_bhist(
self._bhist_cache_timestamp = time.time()
return _parse_jobs_dict(jobs)

def update_and_log_exec_hosts(self, bjobs_exec_hosts: Dict[str, str]) -> None:
for job_id, exec_hosts in bjobs_exec_hosts.items():
if self._jobs[job_id].exec_hosts == "-":
logger.info(
f"Realization {self._jobs[job_id].iens} was assigned to host: {exec_hosts}"
)
self._jobs[job_id].exec_hosts = exec_hosts

def _build_resource_requirement_arg(self, realization_memory: int) -> List[str]:
resource_requirement_string = build_resource_requirement_string(
self._exclude_hosts,
Expand Down
2 changes: 1 addition & 1 deletion tests/ert/unit_tests/scheduler/bin/bjobs.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ def get_parser() -> argparse.ArgumentParser:


def bjobs_formatter(jobstats: List[Job]) -> str:
return "".join([f"{job.job_id}^{job.job_state}\n" for job in jobstats])
return "".join([f"{job.job_id}^{job.job_state}^-\n" for job in jobstats])


def read(path: Path, default: Optional[str] = None) -> Optional[str]:
Expand Down
66 changes: 55 additions & 11 deletions tests/ert/unit_tests/scheduler/test_lsf_driver.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
filter_job_ids_on_submission_time,
parse_bhist,
parse_bjobs,
parse_bjobs_exec_hosts,
)
from tests.ert.utils import poll, wait_until

Expand Down Expand Up @@ -428,13 +429,13 @@ def test_parse_bjobs_gives_empty_result_on_random_input(some_text):
"bjobs_output, expected",
[
pytest.param(
"1^RUN",
"1^RUN^-",
{"1": "RUN"},
id="basic",
),
pytest.param("1^DONE", {"1": "DONE"}, id="done"),
pytest.param("1^DONE^-", {"1": "DONE"}, id="done"),
pytest.param(
"1^DONE\n2^RUN",
"1^DONE^-\n2^RUN^-",
{"1": "DONE", "2": "RUN"},
id="two_jobs",
),
Expand All @@ -444,13 +445,42 @@ def test_parse_bjobs_happy_path(bjobs_output, expected):
assert parse_bjobs(bjobs_output) == expected


@pytest.mark.parametrize(
"bjobs_output, expected",
[
pytest.param(
"1^RUN^abc-comp01",
{"1": "abc-comp01"},
id="one_host",
),
pytest.param(
"1^DONE^abc-comp02\n2^RUN^-",
{"1": "abc-comp02", "2": "-"},
id="two_hosts_output",
),
],
)
def test_parse_bjobs_exec_hosts_happy_path(bjobs_output, expected):
assert parse_bjobs_exec_hosts(bjobs_output) == expected


@given(
st.integers(min_value=1),
nonempty_string_without_whitespace(),
st.from_type(JobState),
)
def test_parse_bjobs(job_id, username, job_state):
assert parse_bjobs(f"{job_id}^{job_state}") == {str(job_id): job_state}
def test_parse_bjobs(job_id, job_state):
assert parse_bjobs(f"{job_id}^{job_state}^-") == {str(job_id): job_state}


@given(
st.integers(min_value=1),
st.from_type(JobState),
nonempty_string_without_whitespace(),
)
def test_parse_bjobs_exec_host(job_id, job_state, exec_host):
assert parse_bjobs_exec_hosts(f"{job_id}^{job_state}^{exec_host}") == {
str(job_id): exec_host
}


@given(nonempty_string_without_whitespace().filter(lambda x: x not in valid_jobstates))
Expand All @@ -460,15 +490,15 @@ def test_parse_bjobs_invalid_state_is_ignored(random_state):

def test_parse_bjobs_invalid_state_is_logged(caplog):
# (cannot combine caplog with hypothesis)
parse_bjobs("1^FOO")
parse_bjobs("1^FOO^-")
assert "Unknown state FOO" in caplog.text


@pytest.mark.parametrize(
"bjobs_script, expectation",
[
pytest.param(
"echo '1^DONE'; exit 0",
"echo '1^DONE^-'; exit 0",
does_not_raise(),
id="all-good",
),
Expand All @@ -484,13 +514,13 @@ def test_parse_bjobs_invalid_state_is_logged(caplog):
id="empty_cluster_specific_id",
),
pytest.param(
"echo '1^DONE'; echo 'Job <2> is not found' >&2 ; exit 255",
"echo '1^DONE^-'; echo 'Job <2> is not found' >&2 ; exit 255",
# If we have some success and some failures, actual command returns 255
does_not_raise(),
id="error_for_irrelevant_job_id",
),
pytest.param(
"echo '2^DONE'",
"echo '2^DONE^-'",
pytest.raises(asyncio.TimeoutError),
id="wrong-job-id",
),
Expand All @@ -500,7 +530,7 @@ def test_parse_bjobs_invalid_state_is_logged(caplog):
id="exit-1",
),
pytest.param(
"echo '1^DONE'; exit 1",
"echo '1^DONE^-'; exit 1",
# (this is not observed in reality)
does_not_raise(),
id="correct_output_but_exitcode_1",
Expand Down Expand Up @@ -979,6 +1009,20 @@ def not_found_bjobs(monkeypatch, tmp_path):
bjobs_path.chmod(bjobs_path.stat().st_mode | stat.S_IEXEC)


async def test_bjobs_exec_host_logs_only_once(tmp_path, job_name, caplog):
caplog.set_level(logging.INFO)
os.chdir(tmp_path)
driver = LsfDriver()
await driver.submit(0, "sh", "-c", "sleep 1", name=job_name)

job_id = next(iter(driver._jobs.keys()))
driver.update_and_log_exec_hosts({job_id: "COMP-01"})
driver.update_and_log_exec_hosts({job_id: "COMP-02"})

await poll(driver, {0})
assert caplog.text.count("was assigned to host:") == 1


async def test_lsf_stdout_file(tmp_path, job_name):
os.chdir(tmp_path)
driver = LsfDriver()
Expand Down

0 comments on commit 398ce6a

Please sign in to comment.