Skip to content

Commit

Permalink
Let ensemble evaluator evaluate NUM_CPU inconsistencies
Browse files Browse the repository at this point in the history
If the forward model runner reports a cpu-time that exceeds
duration times NUM_CPU, it implies that the user has misconfigured
something, typically NUM_CPU, or that the forward model does not
respect NUM_CPU.
  • Loading branch information
berland committed Sep 25, 2024
1 parent c0cd23e commit af60cb0
Show file tree
Hide file tree
Showing 6 changed files with 74 additions and 2 deletions.
1 change: 1 addition & 0 deletions src/_ert/events.py
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,7 @@ class ForwardModelStepRunning(ForwardModelStepBaseEvent):
event_type: Id.FORWARD_MODEL_STEP_RUNNING_TYPE = Id.FORWARD_MODEL_STEP_RUNNING
max_memory_usage: Union[int, None] = None
current_memory_usage: Union[int, None] = None
cpu_seconds: float = 0.0


class ForwardModelStepSuccess(ForwardModelStepBaseEvent):
Expand Down
1 change: 1 addition & 0 deletions src/_ert/forward_model_runner/reporting/event.py
Original file line number Diff line number Diff line change
Expand Up @@ -173,6 +173,7 @@ def _job_handler(self, msg: Union[Start, Running, Exited]):
**job_msg,
max_memory_usage=msg.memory_status.max_rss,
current_memory_usage=msg.memory_status.rss,
cpu_seconds=msg.memory_status.cpu_seconds,
)
self._dump_event(event)

Expand Down
33 changes: 32 additions & 1 deletion src/ert/ensemble_evaluator/evaluator.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import asyncio
import datetime
import logging
import traceback
from contextlib import asynccontextmanager, contextmanager
Expand Down Expand Up @@ -47,6 +48,7 @@
)
from ert.ensemble_evaluator import identifiers as ids

from ._ensemble import FMStepSnapshot
from ._ensemble import LegacyEnsemble as Ensemble
from .config import EvaluatorServerConfig
from .snapshot import EnsembleSnapshot
Expand Down Expand Up @@ -161,12 +163,20 @@ async def _stopped_handler(self, events: Sequence[EnsembleSucceeded]) -> None:
return

max_memory_usage = -1
for fm_step in self.ensemble.snapshot.get_all_fm_steps().values():
for (real_id, _), fm_step in self.ensemble.snapshot.get_all_fm_steps().items():
# Infer max memory usage
memory_usage = fm_step.get(ids.MAX_MEMORY_USAGE) or "-1"
max_memory_usage = max(int(memory_usage), max_memory_usage)

if cpu_message := detect_overspent_cpu(
self.ensemble.reals[int(real_id)].num_cpu, real_id, fm_step
):
logger.warning(cpu_message)

logger.info(
f"Ensemble ran with maximum memory usage for a single realization job: {max_memory_usage}"
)

await self._append_message(self.ensemble.update_snapshot(events))

async def _cancelled_handler(self, events: Sequence[EnsembleCancelled]) -> None:
Expand Down Expand Up @@ -425,3 +435,24 @@ async def run_and_get_successful_realizations(self) -> List[int]:
def _get_ens_id(source: str) -> str:
# the ens_id will be found at /ert/ensemble/ens_id/...
return source.split("/")[3]


def detect_overspent_cpu(num_cpu: int, real_id: str, fm_step: FMStepSnapshot) -> str:
"""Produces a message warning about misconfiguration of NUM_CPU if
so is detected. Returns an empty string if everything is ok."""
now = datetime.datetime.now()
duration = (
(fm_step.get(ids.END_TIME) or now) - (fm_step.get(ids.START_TIME) or now)
).total_seconds()
if duration <= 0:
return ""
cpu_seconds = fm_step.get(ids.CPU_SECONDS) or 0.0
parallelization_obtained = cpu_seconds / duration
if parallelization_obtained > num_cpu:
return (
f"Misconfigured NUM_CPU, forward model step '{fm_step.get(ids.NAME)}' for "
f"realization {real_id} spent {cpu_seconds} cpu seconds "
f"with wall clock duration {duration:.1f} seconds, "
f"a factor of {parallelization_obtained:.2f}, while NUM_CPU was {num_cpu}."
)
return ""
1 change: 1 addition & 0 deletions src/ert/ensemble_evaluator/identifiers.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
ERROR: Final = "error"
INDEX: Final = "index"
MAX_MEMORY_USAGE: Final = "max_memory_usage"
CPU_SECONDS: Final = "cpu_seconds"
NAME: Final = "name"
START_TIME: Final = "start_time"
STATUS: Final = "status"
Expand Down
2 changes: 2 additions & 0 deletions src/ert/ensemble_evaluator/snapshot.py
Original file line number Diff line number Diff line change
Expand Up @@ -346,6 +346,7 @@ def update_from_event(
if type(event) is ForwardModelStepRunning:
fm["current_memory_usage"] = event.current_memory_usage
fm["max_memory_usage"] = event.max_memory_usage
fm["cpu_seconds"] = event.cpu_seconds
if type(event) is ForwardModelStepStart:
fm["stdout"] = event.std_out
fm["stderr"] = event.std_err
Expand Down Expand Up @@ -384,6 +385,7 @@ class FMStepSnapshot(TypedDict, total=False):
index: Optional[str]
current_memory_usage: Optional[int]
max_memory_usage: Optional[int]
cpu_seconds: Optional[float]
name: Optional[str]
error: Optional[str]
stdout: Optional[str]
Expand Down
38 changes: 37 additions & 1 deletion tests/unit_tests/ensemble_evaluator/test_ensemble_evaluator.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,11 @@
import asyncio
import datetime
from functools import partial
from typing import cast

import pytest
from hypothesis import given
from hypothesis import strategies as st

from _ert.events import (
EESnapshot,
Expand All @@ -17,7 +20,13 @@
event_to_json,
)
from _ert.forward_model_runner.client import Client
from ert.ensemble_evaluator import EnsembleEvaluator, EnsembleSnapshot, Monitor
from ert.ensemble_evaluator import (
EnsembleEvaluator,
EnsembleSnapshot,
FMStepSnapshot,
Monitor,
)
from ert.ensemble_evaluator.evaluator import detect_overspent_cpu
from ert.ensemble_evaluator.state import (
ENSEMBLE_STATE_STARTED,
ENSEMBLE_STATE_UNKNOWN,
Expand Down Expand Up @@ -465,3 +474,30 @@ async def test_ensure_multi_level_events_in_order(evaluator_to_use):
if "reals" in event.snapshot:
assert ensemble_state == ENSEMBLE_STATE_STARTED
ensemble_state = event.snapshot.get("status", ensemble_state)


@given(
num_cpu=st.integers(min_value=1, max_value=64),
start=st.datetimes(),
duration=st.integers(min_value=-1, max_value=10000),
cpu_seconds=st.floats(min_value=0),
)
def test_overspent_cpu_is_logged(
num_cpu: int,
start: datetime.datetime,
duration: int,
cpu_seconds: float,
):
message = detect_overspent_cpu(
num_cpu,
"dummy",
FMStepSnapshot(
start_time=start,
end_time=start + datetime.timedelta(seconds=duration),
cpu_seconds=cpu_seconds,
),
)
if duration > 0 and cpu_seconds / duration > num_cpu:
assert "Misconfigured NUM_CPU" in message
else:
assert "NUM_CPU" not in message

0 comments on commit af60cb0

Please sign in to comment.