Skip to content

Commit

Permalink
Move class functions to proper functions
Browse files Browse the repository at this point in the history
Partially solving PLR6301
  • Loading branch information
berland committed Dec 17, 2024
1 parent 0e19012 commit 5962ad0
Show file tree
Hide file tree
Showing 2 changed files with 66 additions and 63 deletions.
46 changes: 24 additions & 22 deletions src/_ert/forward_model_runner/runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,27 +28,6 @@ def __init__(self, steps_data: dict[str, Any]):

self._set_environment()

def _read_manifest(self):
if not Path("manifest.json").exists():
return None
with open("manifest.json", encoding="utf-8") as f:
data = json.load(f)
return {
name: {"type": "file", "path": str(Path(file).absolute())}
for name, file in data.items()
}

def _populate_checksums(self, manifest):
if not manifest:
return {}
for info in manifest.values():
path = Path(info["path"])
if path.exists():
info["md5sum"] = hashlib.md5(path.read_bytes()).hexdigest()
else:
info["error"] = f"Expected file {path} not created by forward model!"
return manifest

def run(self, names_of_steps_to_run: list[str]):
if not names_of_steps_to_run:
step_queue = self.steps
Expand Down Expand Up @@ -86,7 +65,7 @@ def run(self, names_of_steps_to_run: list[str]):
)
return

checksum_dict = self._populate_checksums(self._read_manifest())
checksum_dict = _populate_checksums(_read_manifest())
yield Checksum(checksum_dict=checksum_dict, run_path=os.getcwd())
yield Finish()

Expand All @@ -96,3 +75,26 @@ def _set_environment(self):
for env_key, env_val in os.environ.items():
value = value.replace(f"${env_key}", env_val)
os.environ[key] = value


def _read_manifest() -> dict[str, dict[str, str]] | None:
if not Path("manifest.json").exists():
return None
with open("manifest.json", encoding="utf-8") as f:
data = json.load(f)
return {
name: {"type": "file", "path": str(Path(file).absolute())}
for name, file in data.items()
}


def _populate_checksums(manifest) -> dict[str, dict[str, str]]:
if not manifest:
return {}
for info in manifest.values():
path = Path(info["path"])
if path.exists():
info["md5sum"] = hashlib.md5(path.read_bytes()).hexdigest()
else:
info["error"] = f"Expected file {path} not created by forward model!"
return manifest
83 changes: 42 additions & 41 deletions src/ert/ensemble_evaluator/_ensemble.py
Original file line number Diff line number Diff line change
Expand Up @@ -144,34 +144,6 @@ def _create_snapshot(self) -> EnsembleSnapshot:
def get_successful_realizations(self) -> list[int]:
return self.snapshot.get_successful_realizations()

def _log_completed_fm_step(
self, event: FMEvent, step_snapshot: FMStepSnapshot | None
) -> None:
if step_snapshot is None:
logger.warning(f"Should log {event}, but there was no step_snapshot")
return
step_name = step_snapshot.get("name", "")
start_time = step_snapshot.get("start_time")
cpu_seconds = step_snapshot.get("cpu_seconds")
current_memory_usage = step_snapshot.get("current_memory_usage")
if start_time is not None and event.time is not None:
walltime = (event.time - start_time).total_seconds()
else:
# We get here if the Running event is in the same event batch as
# the Success event. That means that runtime is close to zero.
walltime = 0

if walltime > 120:
logger.info(
f"{event.event_type} {step_name} "
f"{walltime=} "
f"{cpu_seconds=} "
f"{current_memory_usage=} "
f"step_index={event.fm_step} "
f"real={event.real} "
f"ensemble={event.ensemble}"
)

def update_snapshot(self, events: Sequence[Event]) -> EnsembleSnapshot:
snapshot_mutate_event = EnsembleSnapshot()
for event in events:
Expand All @@ -189,21 +161,10 @@ def update_snapshot(self, events: Sequence[Event]) -> EnsembleSnapshot:
.get("fm_steps", {})
.get(event.fm_step)
)
self._log_completed_fm_step(event, step)
_log_completed_fm_step(event, step)

return snapshot_mutate_event

async def send_event(
self,
url: str,
event: Event,
token: str | None = None,
cert: str | bytes | None = None,
retries: int = 10,
) -> None:
async with Client(url, token, cert, max_retries=retries) as client:
await client._send(event_to_json(event))

def generate_event_creator(self) -> Callable[[Id.ENSEMBLE_TYPES], Event]:
def event_builder(status: str) -> Event:
event = {
Expand All @@ -226,7 +187,7 @@ async def evaluate(
self.__class__,
ce_unary_send_method_name,
partialmethod(
self.__class__.send_event,
send_event,
self._config.dispatch_uri,
token=self._config.token,
cert=self._config.cert,
Expand Down Expand Up @@ -328,6 +289,46 @@ def cancel(self) -> None:
logger.debug("evaluator cancelled")


async def send_event(
url: str,
event: Event,
token: str | None = None,
cert: str | bytes | None = None,
retries: int = 10,
) -> None:
async with Client(url, token, cert, max_retries=retries) as client:
await client._send(event_to_json(event))


def _log_completed_fm_step(
event: FMEvent, step_snapshot: FMStepSnapshot | None
) -> None:
if step_snapshot is None:
logger.warning(f"Should log {event}, but there was no step_snapshot")
return
step_name = step_snapshot.get("name", "")
start_time = step_snapshot.get("start_time")
cpu_seconds = step_snapshot.get("cpu_seconds")
current_memory_usage = step_snapshot.get("current_memory_usage")
if start_time is not None and event.time is not None:
walltime = (event.time - start_time).total_seconds()
else:
# We get here if the Running event is in the same event batch as
# the Success event. That means that runtime is close to zero.
walltime = 0

if walltime > 120:
logger.info(
f"{event.event_type} {step_name} "
f"{walltime=} "
f"{cpu_seconds=} "
f"{current_memory_usage=} "
f"step_index={event.fm_step} "
f"real={event.real} "
f"ensemble={event.ensemble}"
)


class _KillAllJobs(Protocol):
def kill_all_jobs(self) -> None: ...

Expand Down

0 comments on commit 5962ad0

Please sign in to comment.