diff --git a/src/_ert/forward_model_runner/runner.py b/src/_ert/forward_model_runner/runner.py index f1186977b9a..f600a16888e 100644 --- a/src/_ert/forward_model_runner/runner.py +++ b/src/_ert/forward_model_runner/runner.py @@ -28,27 +28,6 @@ def __init__(self, steps_data: dict[str, Any]): self._set_environment() - def _read_manifest(self): - if not Path("manifest.json").exists(): - return None - with open("manifest.json", encoding="utf-8") as f: - data = json.load(f) - return { - name: {"type": "file", "path": str(Path(file).absolute())} - for name, file in data.items() - } - - def _populate_checksums(self, manifest): - if not manifest: - return {} - for info in manifest.values(): - path = Path(info["path"]) - if path.exists(): - info["md5sum"] = hashlib.md5(path.read_bytes()).hexdigest() - else: - info["error"] = f"Expected file {path} not created by forward model!" - return manifest - def run(self, names_of_steps_to_run: list[str]): if not names_of_steps_to_run: step_queue = self.steps @@ -86,7 +65,7 @@ def run(self, names_of_steps_to_run: list[str]): ) return - checksum_dict = self._populate_checksums(self._read_manifest()) + checksum_dict = _populate_checksums(_read_manifest()) yield Checksum(checksum_dict=checksum_dict, run_path=os.getcwd()) yield Finish() @@ -96,3 +75,26 @@ def _set_environment(self): for env_key, env_val in os.environ.items(): value = value.replace(f"${env_key}", env_val) os.environ[key] = value + + +def _read_manifest() -> dict[str, dict[str, str]] | None: + if not Path("manifest.json").exists(): + return None + with open("manifest.json", encoding="utf-8") as f: + data = json.load(f) + return { + name: {"type": "file", "path": str(Path(file).absolute())} + for name, file in data.items() + } + + +def _populate_checksums(manifest) -> dict[str, dict[str, str]]: + if not manifest: + return {} + for info in manifest.values(): + path = Path(info["path"]) + if path.exists(): + info["md5sum"] = hashlib.md5(path.read_bytes()).hexdigest() + else: + info["error"] = f"Expected file {path} not created by forward model!" + return manifest diff --git a/src/ert/ensemble_evaluator/_ensemble.py b/src/ert/ensemble_evaluator/_ensemble.py index da9af3aec73..2f915b11fae 100644 --- a/src/ert/ensemble_evaluator/_ensemble.py +++ b/src/ert/ensemble_evaluator/_ensemble.py @@ -144,34 +144,6 @@ def _create_snapshot(self) -> EnsembleSnapshot: def get_successful_realizations(self) -> list[int]: return self.snapshot.get_successful_realizations() - def _log_completed_fm_step( - self, event: FMEvent, step_snapshot: FMStepSnapshot | None - ) -> None: - if step_snapshot is None: - logger.warning(f"Should log {event}, but there was no step_snapshot") - return - step_name = step_snapshot.get("name", "") - start_time = step_snapshot.get("start_time") - cpu_seconds = step_snapshot.get("cpu_seconds") - current_memory_usage = step_snapshot.get("current_memory_usage") - if start_time is not None and event.time is not None: - walltime = (event.time - start_time).total_seconds() - else: - # We get here if the Running event is in the same event batch as - # the Success event. That means that runtime is close to zero. - walltime = 0 - - if walltime > 120: - logger.info( - f"{event.event_type} {step_name} " - f"{walltime=} " - f"{cpu_seconds=} " - f"{current_memory_usage=} " - f"step_index={event.fm_step} " - f"real={event.real} " - f"ensemble={event.ensemble}" - ) - def update_snapshot(self, events: Sequence[Event]) -> EnsembleSnapshot: snapshot_mutate_event = EnsembleSnapshot() for event in events: @@ -189,21 +161,10 @@ def update_snapshot(self, events: Sequence[Event]) -> EnsembleSnapshot: .get("fm_steps", {}) .get(event.fm_step) ) - self._log_completed_fm_step(event, step) + _log_completed_fm_step(event, step) return snapshot_mutate_event - async def send_event( - self, - url: str, - event: Event, - token: str | None = None, - cert: str | bytes | None = None, - retries: int = 10, - ) -> None: - async with Client(url, token, cert, max_retries=retries) as client: - await client._send(event_to_json(event)) - def generate_event_creator(self) -> Callable[[Id.ENSEMBLE_TYPES], Event]: def event_builder(status: str) -> Event: event = { @@ -226,7 +187,7 @@ async def evaluate( self.__class__, ce_unary_send_method_name, partialmethod( - self.__class__.send_event, + send_event, self._config.dispatch_uri, token=self._config.token, cert=self._config.cert, @@ -328,6 +289,46 @@ def cancel(self) -> None: logger.debug("evaluator cancelled") +async def send_event( + url: str, + event: Event, + token: str | None = None, + cert: str | bytes | None = None, + retries: int = 10, +) -> None: + async with Client(url, token, cert, max_retries=retries) as client: + await client._send(event_to_json(event)) + + +def _log_completed_fm_step( + event: FMEvent, step_snapshot: FMStepSnapshot | None +) -> None: + if step_snapshot is None: + logger.warning(f"Should log {event}, but there was no step_snapshot") + return + step_name = step_snapshot.get("name", "") + start_time = step_snapshot.get("start_time") + cpu_seconds = step_snapshot.get("cpu_seconds") + current_memory_usage = step_snapshot.get("current_memory_usage") + if start_time is not None and event.time is not None: + walltime = (event.time - start_time).total_seconds() + else: + # We get here if the Running event is in the same event batch as + # the Success event. That means that runtime is close to zero. + walltime = 0 + + if walltime > 120: + logger.info( + f"{event.event_type} {step_name} " + f"{walltime=} " + f"{cpu_seconds=} " + f"{current_memory_usage=} " + f"step_index={event.fm_step} " + f"real={event.real} " + f"ensemble={event.ensemble}" + ) + + class _KillAllJobs(Protocol): def kill_all_jobs(self) -> None: ...