Skip to content

Commit

Permalink
Remove use of deprecated types
Browse files Browse the repository at this point in the history
  • Loading branch information
eivindjahren committed Dec 10, 2024
1 parent 68d01c7 commit 44cdd01
Show file tree
Hide file tree
Showing 90 changed files with 1,021 additions and 1,227 deletions.
246 changes: 123 additions & 123 deletions docs/everest/config_generated.rst

Large diffs are not rendered by default.

99 changes: 45 additions & 54 deletions src/_ert/events.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
from datetime import datetime
from typing import Annotated, Any, Dict, Final, Literal, Union
from typing import Annotated, Any, Final, Literal, Optional

from pydantic import BaseModel, ConfigDict, Field, TypeAdapter

Expand Down Expand Up @@ -39,12 +39,12 @@ class Id:
ENSEMBLE_SUCCEEDED: Final = "ensemble.succeeded"
ENSEMBLE_CANCELLED: Final = "ensemble.cancelled"
ENSEMBLE_FAILED: Final = "ensemble.failed"
ENSEMBLE_TYPES = Union[
ENSEMBLE_STARTED_TYPE,
ENSEMBLE_FAILED_TYPE,
ENSEMBLE_SUCCEEDED_TYPE,
ENSEMBLE_CANCELLED_TYPE,
]
ENSEMBLE_TYPES = (
ENSEMBLE_STARTED_TYPE
| ENSEMBLE_FAILED_TYPE
| ENSEMBLE_SUCCEEDED_TYPE
| ENSEMBLE_CANCELLED_TYPE
)

EE_SNAPSHOT_TYPE = Literal["ee.snapshot"]
EE_SNAPSHOT_UPDATE_TYPE = Literal["ee.snapshot_update"]
Expand All @@ -64,47 +64,47 @@ class BaseEvent(BaseModel):


class ForwardModelStepBaseEvent(BaseEvent):
ensemble: Union[str, None] = None
ensemble: Optional[str] = None
real: str
fm_step: str


class ForwardModelStepStart(ForwardModelStepBaseEvent):
event_type: Id.FORWARD_MODEL_STEP_START_TYPE = Id.FORWARD_MODEL_STEP_START
std_out: Union[str, None] = None
std_err: Union[str, None] = None
std_out: Optional[str] = None
std_err: Optional[str] = None


class ForwardModelStepRunning(ForwardModelStepBaseEvent):
event_type: Id.FORWARD_MODEL_STEP_RUNNING_TYPE = Id.FORWARD_MODEL_STEP_RUNNING
max_memory_usage: Union[int, None] = None
current_memory_usage: Union[int, None] = None
max_memory_usage: Optional[int] = None
current_memory_usage: Optional[int] = None
cpu_seconds: float = 0.0


class ForwardModelStepSuccess(ForwardModelStepBaseEvent):
event_type: Id.FORWARD_MODEL_STEP_SUCCESS_TYPE = Id.FORWARD_MODEL_STEP_SUCCESS
current_memory_usage: Union[int, None] = None
current_memory_usage: Optional[int] = None


class ForwardModelStepFailure(ForwardModelStepBaseEvent):
event_type: Id.FORWARD_MODEL_STEP_FAILURE_TYPE = Id.FORWARD_MODEL_STEP_FAILURE
error_msg: str
exit_code: Union[int, None] = None
exit_code: Optional[int] = None


class ForwardModelStepChecksum(BaseEvent):
event_type: Id.FORWARD_MODEL_STEP_CHECKSUM_TYPE = Id.FORWARD_MODEL_STEP_CHECKSUM
ensemble: Union[str, None] = None
ensemble: Optional[str] = None
real: str
checksums: Dict[str, Dict[str, Any]]
checksums: dict[str, dict[str, Any]]


class RealizationBaseEvent(BaseEvent):
real: str
ensemble: Union[str, None] = None
queue_event_type: Union[str, None] = None
exec_hosts: Union[str, None] = None
ensemble: Optional[str] = None
queue_event_type: Optional[str] = None
exec_hosts: Optional[str] = None


class RealizationPending(RealizationBaseEvent):
Expand All @@ -121,7 +121,7 @@ class RealizationSuccess(RealizationBaseEvent):

class RealizationFailed(RealizationBaseEvent):
event_type: Id.REALIZATION_FAILURE_TYPE = Id.REALIZATION_FAILURE
message: Union[str, None] = None # Only used for JobState.FAILED
message: Optional[str] = None # Only used for JobState.FAILED


class RealizationUnknown(RealizationBaseEvent):
Expand All @@ -137,7 +137,7 @@ class RealizationTimeout(RealizationBaseEvent):


class EnsembleBaseEvent(BaseEvent):
ensemble: Union[str, None] = None
ensemble: Optional[str] = None


class EnsembleStarted(EnsembleBaseEvent):
Expand Down Expand Up @@ -168,7 +168,7 @@ class EESnapshotUpdate(EnsembleBaseEvent):

class EETerminated(BaseEvent):
event_type: Id.EE_TERMINATED_TYPE = Id.EE_TERMINATED
ensemble: Union[str, None] = None
ensemble: Optional[str] = None


class EEUserCancel(BaseEvent):
Expand All @@ -181,39 +181,30 @@ class EEUserDone(BaseEvent):
monitor: str


FMEvent = Union[
ForwardModelStepStart,
ForwardModelStepRunning,
ForwardModelStepSuccess,
ForwardModelStepFailure,
]
FMEvent = (
ForwardModelStepStart
| ForwardModelStepRunning
| ForwardModelStepSuccess
| ForwardModelStepFailure
)

RealizationEvent = Union[
RealizationPending,
RealizationRunning,
RealizationSuccess,
RealizationFailed,
RealizationTimeout,
RealizationUnknown,
RealizationWaiting,
]
RealizationEvent = (
RealizationPending
| RealizationRunning
| RealizationSuccess
| RealizationFailed
| RealizationTimeout
| RealizationUnknown
| RealizationWaiting
)

EnsembleEvent = Union[
EnsembleStarted, EnsembleSucceeded, EnsembleFailed, EnsembleCancelled
]
EnsembleEvent = EnsembleStarted | EnsembleSucceeded | EnsembleFailed | EnsembleCancelled

EEEvent = Union[EESnapshot, EESnapshotUpdate, EETerminated, EEUserCancel, EEUserDone]
EEEvent = EESnapshot | EESnapshotUpdate | EETerminated | EEUserCancel | EEUserDone

Event = Union[
FMEvent, ForwardModelStepChecksum, RealizationEvent, EEEvent, EnsembleEvent
]
Event = FMEvent | ForwardModelStepChecksum | RealizationEvent | EEEvent | EnsembleEvent

DispatchEvent = Union[
FMEvent,
ForwardModelStepChecksum,
RealizationEvent,
EnsembleEvent,
]
DispatchEvent = FMEvent | ForwardModelStepChecksum | RealizationEvent | EnsembleEvent

_DISPATCH_EVENTS_ANNOTATION = Annotated[
DispatchEvent, Field(discriminator="event_type")
Expand All @@ -226,21 +217,21 @@ class EEUserDone(BaseEvent):
EventAdapter: TypeAdapter[Event] = TypeAdapter(_ALL_EVENTS_ANNOTATION)


def dispatch_event_from_json(raw_msg: Union[str, bytes]) -> DispatchEvent:
def dispatch_event_from_json(raw_msg: str | bytes) -> DispatchEvent:
return DispatchEventAdapter.validate_json(raw_msg)


def event_from_json(raw_msg: Union[str, bytes]) -> Event:
def event_from_json(raw_msg: str | bytes) -> Event:
return EventAdapter.validate_json(raw_msg)


def event_from_dict(dict_msg: Dict[str, Any]) -> Event:
def event_from_dict(dict_msg: dict[str, Any]) -> Event:
return EventAdapter.validate_python(dict_msg)


def event_to_json(event: Event) -> str:
return event.model_dump_json()


def event_to_dict(event: Event) -> Dict[str, Any]:
def event_to_dict(event: Event) -> dict[str, Any]:
return event.model_dump()
3 changes: 1 addition & 2 deletions src/_ert/forward_model_runner/client.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,8 @@
import asyncio
import logging
import ssl
from typing import Any, AnyStr, Optional, Union
from typing import Any, AnyStr, Optional, Self, Union

from typing_extensions import Self
from websockets.asyncio.client import ClientConnection, connect
from websockets.datastructures import Headers
from websockets.exceptions import (
Expand Down
34 changes: 12 additions & 22 deletions src/_ert/forward_model_runner/forward_model_step.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,17 +12,7 @@
from datetime import datetime as dt
from pathlib import Path
from subprocess import Popen, run
from typing import (
TYPE_CHECKING,
Dict,
Generator,
List,
Optional,
Sequence,
Set,
Tuple,
cast,
)
from typing import TYPE_CHECKING, Generator, Sequence, cast

from psutil import AccessDenied, NoSuchProcess, Process, TimeoutExpired, ZombieProcess

Expand All @@ -40,7 +30,7 @@
logger = logging.getLogger(__name__)


def killed_by_oom(pids: Set[int]) -> bool:
def killed_by_oom(pids: set[int]) -> bool:
"""Will try to detect if a process (or any of its descendants) was killed
by the Linux OOM-killer.
Expand Down Expand Up @@ -117,7 +107,7 @@ def create_start_message_and_check_job_files(self) -> Start:
start_message = start_message.with_error("\n".join(errors))
return start_message

def _build_arg_list(self) -> List[str]:
def _build_arg_list(self) -> list[str]:
executable = self.job_data.get("executable")
# assert executable is not None
combined_arg_list = [executable]
Expand All @@ -127,7 +117,7 @@ def _build_arg_list(self) -> List[str]:

def _open_file_handles(
self,
) -> Tuple[
) -> tuple[
io.TextIOWrapper | None, io.TextIOWrapper | None, io.TextIOWrapper | None
]:
if self.job_data.get("stdin"):
Expand Down Expand Up @@ -155,7 +145,7 @@ def _open_file_handles(

return (stdin, stdout, stderr)

def _create_environment(self) -> Optional[Dict[str, str]]:
def _create_environment(self) -> dict[str, str] | None:
combined_environment = None
if environment := self.job_data.get("environment"):
combined_environment = {**os.environ, **environment}
Expand All @@ -174,7 +164,7 @@ def _run(self) -> Generator[Start | Exited | Running | None]:
# stdin/stdout/stderr are closed at the end of this function

target_file = self.job_data.get("target_file")
target_file_mtime: Optional[int] = _get_target_file_ntime(target_file)
target_file_mtime: int | None = _get_target_file_ntime(target_file)
run_start_time = dt.now()
try:
proc = Popen(
Expand Down Expand Up @@ -234,9 +224,9 @@ def _run(self) -> Generator[Start | Exited | Running | None]:
def _create_exited_message_based_on_exit_code(
self,
max_memory_usage: int,
target_file_mtime: Optional[int],
target_file_mtime: int | None,
exit_code: int,
fm_step_pids: Set[int],
fm_step_pids: set[int],
) -> Exited:
if exit_code != 0:
exited_message = self._create_exited_msg_for_non_zero_exit_code(
Expand All @@ -263,7 +253,7 @@ def _create_exited_msg_for_non_zero_exit_code(
self,
max_memory_usage: int,
exit_code: int,
fm_step_pids: Set[int],
fm_step_pids: set[int],
) -> Exited:
# All child pids for the forward model step. Need to track these in order to be able
# to detect OOM kills in case of failure.
Expand All @@ -283,7 +273,7 @@ def _create_exited_msg_for_non_zero_exit_code(
)

def handle_process_timeout_and_create_exited_msg(
self, exit_code: Optional[int], proc: Popen[Process], run_start_time: dt
self, exit_code: int | None, proc: Popen[Process], run_start_time: dt
) -> Exited | None:
max_running_minutes = self.job_data.get("max_running_minutes")

Expand Down Expand Up @@ -421,7 +411,7 @@ def _assert_arg_list(self):
return errors


def _get_target_file_ntime(file: Optional[str]) -> Optional[int]:
def _get_target_file_ntime(file: str | None) -> int | None:
mtime = None
if file and os.path.exists(file):
stat = os.stat(file)
Expand All @@ -437,7 +427,7 @@ def ensure_file_handles_closed(file_handles: Sequence[io.TextIOWrapper | None])

def _get_processtree_data(
process: Process,
) -> Tuple[int, float, Optional[int], Set[int]]:
) -> tuple[int, float, int | None, set[int]]:
"""Obtain the oom_score (the Linux kernel uses this number to
decide which process to kill first in out-of-memory siturations).
Expand Down
3 changes: 2 additions & 1 deletion src/_ert/forward_model_runner/reporting/message.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
import dataclasses
from datetime import datetime as dt
from typing import TYPE_CHECKING, Dict, Literal, Optional, TypedDict
from typing import TYPE_CHECKING, Dict, Literal, Optional

import psutil
from typing_extensions import TypedDict

if TYPE_CHECKING:
from _ert.forward_model_runner.forward_model_step import ForwardModelStep
Expand Down
Loading

0 comments on commit 44cdd01

Please sign in to comment.