Skip to content

Commit

Permalink
Add reconnection for monitor
Browse files Browse the repository at this point in the history
  • Loading branch information
xjules committed Nov 21, 2024
1 parent 40fd79f commit 0ff0c0b
Showing 1 changed file with 17 additions and 6 deletions.
23 changes: 17 additions & 6 deletions src/ert/ensemble_evaluator/monitor.py
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,21 @@ async def track(
if event is not None:
self._event_queue.task_done()

async def reconnect(self) -> None:
retries = 5
while retries > 0:
try:
self._socket.connect(self._ee_con_info.router_uri)
break
except zmq.ZMQError as e:
logger.warning(
f"Failed to connect to {self._ee_con_info.router_uri}: {e}"
)
retries -= 1
if retries == 0:
raise e
await asyncio.sleep(1)

async def _receiver(self) -> None:
tls: Optional[ssl.SSLContext] = None
if self._ee_con_info.cert:
Expand All @@ -134,11 +149,7 @@ async def _receiver(self) -> None:

client_id = f"client-{uuid.uuid4().hex[:8]}"
self._socket.setsockopt_string(zmq.IDENTITY, client_id)
try:
self._socket.connect(self._ee_con_info.router_uri)
except zmq.ZMQError as exc:
logger.error(f"Failed to connect to {self._ee_con_info.router_uri} {exc}")
raise
await self.reconnect()
await self._socket.send_multipart([b"", b"CONNECT"])
self._connected.set()

Expand All @@ -152,4 +163,4 @@ async def _receiver(self) -> None:
logger.debug(
f"ZeroMQ connection to EnsembleEvaluator went down, reconnecting: {exc}"
)
await asyncio.sleep(1)
await self.reconnect()

0 comments on commit 0ff0c0b

Please sign in to comment.