diff --git a/src/ert/ensemble_evaluator/monitor.py b/src/ert/ensemble_evaluator/monitor.py index 3388779e67a..d62a0c65eb8 100644 --- a/src/ert/ensemble_evaluator/monitor.py +++ b/src/ert/ensemble_evaluator/monitor.py @@ -118,6 +118,21 @@ async def track( if event is not None: self._event_queue.task_done() + async def reconnect(self) -> None: + retries = 5 + while retries > 0: + try: + self._socket.connect(self._ee_con_info.router_uri) + break + except zmq.ZMQError as e: + logger.warning( + f"Failed to connect to {self._ee_con_info.router_uri}: {e}" + ) + retries -= 1 + if retries == 0: + raise e + await asyncio.sleep(1) + async def _receiver(self) -> None: tls: Optional[ssl.SSLContext] = None if self._ee_con_info.cert: @@ -134,11 +149,7 @@ async def _receiver(self) -> None: client_id = f"client-{uuid.uuid4().hex[:8]}" self._socket.setsockopt_string(zmq.IDENTITY, client_id) - try: - self._socket.connect(self._ee_con_info.router_uri) - except zmq.ZMQError as exc: - logger.error(f"Failed to connect to {self._ee_con_info.router_uri} {exc}") - raise + await self.reconnect() await self._socket.send_multipart([b"", b"CONNECT"]) self._connected.set() @@ -152,4 +163,4 @@ async def _receiver(self) -> None: logger.debug( f"ZeroMQ connection to EnsembleEvaluator went down, reconnecting: {exc}" ) - await asyncio.sleep(1) + await self.reconnect()