Skip to content

Commit

Permalink
fixup: change end of events sentinels
Browse files Browse the repository at this point in the history
  • Loading branch information
JHolba committed Aug 30, 2024
1 parent 3acdf62 commit a4ec77b
Show file tree
Hide file tree
Showing 2 changed files with 13 additions and 7 deletions.
8 changes: 6 additions & 2 deletions src/_ert_forward_model_runner/reporting/event.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
import threading
from datetime import datetime, timedelta
from pathlib import Path
from typing import Type, Union
from typing import Final, Type, Union

from _ert import events
from _ert.events import (
Expand Down Expand Up @@ -37,6 +37,10 @@
logger = logging.getLogger(__name__)


class EndOfEventsSentinel:
pass


class Event(Reporter):
"""
The Event reporter forwards events, coming from the running job, added with
Expand All @@ -53,7 +57,7 @@ class Event(Reporter):
before stopping the reporter. Any remaining events will not be sent.
"""

_sentinel: None = None
_sentinel: Final = EndOfEventsSentinel()

def __init__(self, evaluator_url, token=None, cert_path=None):
self._evaluator_url = evaluator_url
Expand Down
12 changes: 7 additions & 5 deletions src/ert/ensemble_evaluator/monitor.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
import logging
import ssl
import uuid
from typing import TYPE_CHECKING, Any, AsyncGenerator, Optional, Union
from typing import TYPE_CHECKING, Any, AsyncGenerator, Final, Optional, Type, Union

from aiohttp import ClientError
from websockets import ConnectionClosed, Headers, WebSocketClientProtocol
Expand Down Expand Up @@ -30,10 +30,12 @@ class CloseTrackerEvent:


class Monitor:
_sentinel: Final = CloseTrackerEvent()

def __init__(self, ee_con_info: "EvaluatorConnectionInfo") -> None:
self._ee_con_info = ee_con_info
self._id = str(uuid.uuid1()).split("-", maxsplit=1)[0]
self._event_queue: asyncio.Queue[Union[Event, CloseTrackerEvent]] = (
self._event_queue: asyncio.Queue[Union[Event, Type[Monitor._sentinel]]] = (

Check failure on line 38 in src/ert/ensemble_evaluator/monitor.py

View workflow job for this annotation

GitHub Actions / type-checking (3.12)

Variable "ert.ensemble_evaluator.monitor.Monitor._sentinel" is not valid as a type
asyncio.Queue()
)
self._connection: Optional[WebSocketClientProtocol] = None
Expand Down Expand Up @@ -71,7 +73,7 @@ async def __aexit__(self, exc_type: Any, exc_value: Any, traceback: Any) -> None
async def signal_cancel(self) -> None:
if not self._connection:
return
await self._event_queue.put(CloseTrackerEvent())
await self._event_queue.put(Monitor._sentinel)

Check failure on line 76 in src/ert/ensemble_evaluator/monitor.py

View workflow job for this annotation

GitHub Actions / type-checking (3.12)

Argument 1 to "put" of "Queue" has incompatible type "CloseTrackerEvent"; expected "ForwardModelStepStart | ForwardModelStepRunning | ForwardModelStepSuccess | ForwardModelStepFailure | ForwardModelStepChecksum | RealizationPending | RealizationRunning | RealizationSuccess | RealizationFailed | RealizationTimeout | RealizationUnknown | RealizationWaiting | EESnapshot | EESnapshotUpdate | EETerminated | EEUserCancel | EEUserDone | EnsembleStarted | EnsembleSucceeded | EnsembleFailed | EnsembleCancelled | type[Monitor._sentinel?]"
logger.debug(f"monitor-{self._id} asking server to cancel...")

cancel_event = EEUserCancel(monitor=self._id)
Expand All @@ -81,7 +83,7 @@ async def signal_cancel(self) -> None:
async def signal_done(self) -> None:
if not self._connection:
return
await self._event_queue.put(CloseTrackerEvent())
await self._event_queue.put(Monitor._sentinel)

Check failure on line 86 in src/ert/ensemble_evaluator/monitor.py

View workflow job for this annotation

GitHub Actions / type-checking (3.12)

Argument 1 to "put" of "Queue" has incompatible type "CloseTrackerEvent"; expected "ForwardModelStepStart | ForwardModelStepRunning | ForwardModelStepSuccess | ForwardModelStepFailure | ForwardModelStepChecksum | RealizationPending | RealizationRunning | RealizationSuccess | RealizationFailed | RealizationTimeout | RealizationUnknown | RealizationWaiting | EESnapshot | EESnapshotUpdate | EETerminated | EEUserCancel | EEUserDone | EnsembleStarted | EnsembleSucceeded | EnsembleFailed | EnsembleCancelled | type[Monitor._sentinel?]"
logger.debug(f"monitor-{self._id} informing server monitor is done...")

done_event = EEUserDone(monitor=self._id)
Expand All @@ -108,7 +110,7 @@ async def track(
logger.error("Evaluator did not send the TERMINATED event!")
break
event = None
if isinstance(event, CloseTrackerEvent):
if event is Monitor._sentinel:

Check failure on line 113 in src/ert/ensemble_evaluator/monitor.py

View workflow job for this annotation

GitHub Actions / type-checking (3.12)

Non-overlapping identity check (left operand type: "ForwardModelStepStart | ForwardModelStepRunning | ForwardModelStepSuccess | ForwardModelStepFailure | ForwardModelStepChecksum | <17 more items> | None", right operand type: "CloseTrackerEvent")
closetracker_received = True
_heartbeat_interval = self._receiver_timeout
else:
Expand Down

0 comments on commit a4ec77b

Please sign in to comment.