diff --git a/src/_ert/events.py b/src/_ert/events.py index 0f810286f9e..99033c7e833 100644 --- a/src/_ert/events.py +++ b/src/_ert/events.py @@ -85,6 +85,7 @@ class ForwardModelStepRunning(ForwardModelStepBaseEvent): event_type: Id.FORWARD_MODEL_STEP_RUNNING_TYPE = Id.FORWARD_MODEL_STEP_RUNNING max_memory_usage: Union[int, None] = None current_memory_usage: Union[int, None] = None + cpu_seconds: float = 0.0 class ForwardModelStepSuccess(ForwardModelStepBaseEvent): diff --git a/src/_ert/forward_model_runner/reporting/event.py b/src/_ert/forward_model_runner/reporting/event.py index ece3d893408..8bf13dee238 100644 --- a/src/_ert/forward_model_runner/reporting/event.py +++ b/src/_ert/forward_model_runner/reporting/event.py @@ -173,6 +173,7 @@ def _job_handler(self, msg: Union[Start, Running, Exited]): **job_msg, max_memory_usage=msg.memory_status.max_rss, current_memory_usage=msg.memory_status.rss, + cpu_seconds=msg.memory_status.cpu_seconds, ) self._dump_event(event) diff --git a/src/ert/ensemble_evaluator/evaluator.py b/src/ert/ensemble_evaluator/evaluator.py index bb7f91cc05e..4afe8e97971 100644 --- a/src/ert/ensemble_evaluator/evaluator.py +++ b/src/ert/ensemble_evaluator/evaluator.py @@ -1,4 +1,5 @@ import asyncio +import datetime import logging import traceback from contextlib import asynccontextmanager, contextmanager @@ -47,6 +48,7 @@ ) from ert.ensemble_evaluator import identifiers as ids +from ._ensemble import FMStepSnapshot from ._ensemble import LegacyEnsemble as Ensemble from .config import EvaluatorServerConfig from .snapshot import EnsembleSnapshot @@ -161,12 +163,20 @@ async def _stopped_handler(self, events: Sequence[EnsembleSucceeded]) -> None: return max_memory_usage = -1 - for fm_step in self.ensemble.snapshot.get_all_fm_steps().values(): + for (real_id, _), fm_step in self.ensemble.snapshot.get_all_fm_steps().items(): + # Infer max memory usage memory_usage = fm_step.get(ids.MAX_MEMORY_USAGE) or "-1" max_memory_usage = max(int(memory_usage), max_memory_usage) + + if cpu_message := detect_overspent_cpu( + self.ensemble.reals[int(real_id)].num_cpu, real_id, fm_step + ): + logger.warning(cpu_message) + logger.info( f"Ensemble ran with maximum memory usage for a single realization job: {max_memory_usage}" ) + await self._append_message(self.ensemble.update_snapshot(events)) async def _cancelled_handler(self, events: Sequence[EnsembleCancelled]) -> None: @@ -425,3 +435,24 @@ async def run_and_get_successful_realizations(self) -> List[int]: def _get_ens_id(source: str) -> str: # the ens_id will be found at /ert/ensemble/ens_id/... return source.split("/")[3] + + +def detect_overspent_cpu(num_cpu: int, real_id: str, fm_step: FMStepSnapshot) -> str: + """Produces a message warning about misconfiguration of NUM_CPU if + so is detected. Returns an empty string if everything is ok.""" + now = datetime.datetime.now() + duration = ( + (fm_step.get(ids.END_TIME) or now) - (fm_step.get(ids.START_TIME) or now) + ).total_seconds() + if duration <= 0: + return "" + cpu_seconds = fm_step.get(ids.CPU_SECONDS) or 0.0 + parallelization_obtained = cpu_seconds / duration + if parallelization_obtained > num_cpu: + return ( + f"Misconfigured NUM_CPU, forward model step '{fm_step.get(ids.NAME)}' for " + f"realization {real_id} spent {cpu_seconds} cpu seconds " + f"with wall clock duration {duration:.1f} seconds, " + f"a factor of {parallelization_obtained:.2f}, while NUM_CPU was {num_cpu}." + ) + return "" diff --git a/src/ert/ensemble_evaluator/identifiers.py b/src/ert/ensemble_evaluator/identifiers.py index 31cccb84be3..918b11779ad 100644 --- a/src/ert/ensemble_evaluator/identifiers.py +++ b/src/ert/ensemble_evaluator/identifiers.py @@ -5,6 +5,7 @@ ERROR: Final = "error" INDEX: Final = "index" MAX_MEMORY_USAGE: Final = "max_memory_usage" +CPU_SECONDS: Final = "cpu_seconds" NAME: Final = "name" START_TIME: Final = "start_time" STATUS: Final = "status" diff --git a/src/ert/ensemble_evaluator/snapshot.py b/src/ert/ensemble_evaluator/snapshot.py index 00448fbd728..35e0157944f 100644 --- a/src/ert/ensemble_evaluator/snapshot.py +++ b/src/ert/ensemble_evaluator/snapshot.py @@ -346,6 +346,7 @@ def update_from_event( if type(event) is ForwardModelStepRunning: fm["current_memory_usage"] = event.current_memory_usage fm["max_memory_usage"] = event.max_memory_usage + fm["cpu_seconds"] = event.cpu_seconds if type(event) is ForwardModelStepStart: fm["stdout"] = event.std_out fm["stderr"] = event.std_err @@ -384,6 +385,7 @@ class FMStepSnapshot(TypedDict, total=False): index: Optional[str] current_memory_usage: Optional[int] max_memory_usage: Optional[int] + cpu_seconds: Optional[float] name: Optional[str] error: Optional[str] stdout: Optional[str] diff --git a/tests/unit_tests/ensemble_evaluator/test_ensemble_evaluator.py b/tests/unit_tests/ensemble_evaluator/test_ensemble_evaluator.py index 11f01658b84..668527b6017 100644 --- a/tests/unit_tests/ensemble_evaluator/test_ensemble_evaluator.py +++ b/tests/unit_tests/ensemble_evaluator/test_ensemble_evaluator.py @@ -1,8 +1,11 @@ import asyncio +import datetime from functools import partial from typing import cast import pytest +from hypothesis import given +from hypothesis import strategies as st from _ert.events import ( EESnapshot, @@ -17,7 +20,13 @@ event_to_json, ) from _ert.forward_model_runner.client import Client -from ert.ensemble_evaluator import EnsembleEvaluator, EnsembleSnapshot, Monitor +from ert.ensemble_evaluator import ( + EnsembleEvaluator, + EnsembleSnapshot, + FMStepSnapshot, + Monitor, +) +from ert.ensemble_evaluator.evaluator import detect_overspent_cpu from ert.ensemble_evaluator.state import ( ENSEMBLE_STATE_STARTED, ENSEMBLE_STATE_UNKNOWN, @@ -465,3 +474,30 @@ async def test_ensure_multi_level_events_in_order(evaluator_to_use): if "reals" in event.snapshot: assert ensemble_state == ENSEMBLE_STATE_STARTED ensemble_state = event.snapshot.get("status", ensemble_state) + + +@given( + num_cpu=st.integers(min_value=1, max_value=64), + start=st.datetimes(), + duration=st.integers(min_value=-1, max_value=10000), + cpu_seconds=st.floats(min_value=0), +) +def test_overspent_cpu_is_logged( + num_cpu: int, + start: datetime.datetime, + duration: int, + cpu_seconds: float, +): + message = detect_overspent_cpu( + num_cpu, + "dummy", + FMStepSnapshot( + start_time=start, + end_time=start + datetime.timedelta(seconds=duration), + cpu_seconds=cpu_seconds, + ), + ) + if duration > 0 and cpu_seconds / duration > num_cpu: + assert "Misconfigured NUM_CPU" in message + else: + assert "NUM_CPU" not in message