From 352fe9a00386a1a28c2ec79b79a1deddeed1b7fb Mon Sep 17 00:00:00 2001 From: Will Holley Date: Wed, 16 Oct 2024 22:09:43 +0100 Subject: [PATCH] Handle exceptions in client._task_network_consumer When exceptions bubble up to _task_network_consumer, log them and close the connection. This prevents the network consumer becoming unresponsive in the face of unhandled exceptions, which then surface as TimeoutErrors to GivTCP processes which are waiting on responses from the inverter. --- .../givenergy_modbus_async/client/client.py | 88 ++++++++++--------- 1 file changed, 47 insertions(+), 41 deletions(-) diff --git a/GivTCP/givenergy_modbus_async/client/client.py b/GivTCP/givenergy_modbus_async/client/client.py index fa84b50..a0298f2 100644 --- a/GivTCP/givenergy_modbus_async/client/client.py +++ b/GivTCP/givenergy_modbus_async/client/client.py @@ -328,47 +328,53 @@ async def one_shot_command( async def _task_network_consumer(self): """Task for orchestrating incoming data.""" - while hasattr(self, "reader") and self.reader and not self.reader.at_eof(): - frame = await self.reader.read(300) - # await self.debug_frames['all'].put(frame) - for message in self.framer.decode(frame): - _logger.debug("Processing %s", message) - if isinstance(message, ExceptionBase): - _logger.warning( - "Expected response never arrived but resulted in exception: %s", - message, - ) - continue - if isinstance(message, HeartbeatRequest): - _logger.debug("Responding to HeartbeatRequest") - await self.tx_queue.put( - (message.expected_response().encode(), None) - ) - continue - if not isinstance(message, TransparentResponse): - _logger.warning( - "Received unexpected message type for a client: %s", message - ) - continue - if isinstance(message, WriteHoldingRegisterResponse): - if message.error: - _logger.warning("%s", message) - else: - _logger.info("%s", message) - - future = self.expected_responses.get(message.shape_hash()) - - if future and not future.done(): - future.set_result(message) - # try: - self.plant.update(message) - # except RegisterCacheUpdateFailed as e: - # # await self.debug_frames['error'].put(frame) - # _logger.debug(f'Ignoring {message}: {e}') - _logger.debug( - "network_consumer reader at EOF, cannot continue, closing connection" - ) - await self.close() + try: + while hasattr(self, "reader") and self.reader and not self.reader.at_eof(): + frame = await self.reader.read(300) + # await self.debug_frames['all'].put(frame) + for message in self.framer.decode(frame): + _logger.debug("Processing %s", message) + if isinstance(message, ExceptionBase): + _logger.warning( + "Expected response never arrived but resulted in exception: %s", + message, + ) + continue + if isinstance(message, HeartbeatRequest): + _logger.debug("Responding to HeartbeatRequest") + await self.tx_queue.put( + (message.expected_response().encode(), None) + ) + continue + if not isinstance(message, TransparentResponse): + _logger.warning( + "Received unexpected message type for a client: %s", message + ) + continue + if isinstance(message, WriteHoldingRegisterResponse): + if message.error: + _logger.warning("%s", message) + else: + _logger.info("%s", message) + + future = self.expected_responses.get(message.shape_hash()) + + if future and not future.done(): + future.set_result(message) + # try: + self.plant.update(message) + # except RegisterCacheUpdateFailed as e: + # # await self.debug_frames['error'].put(frame) + # _logger.debug(f'Ignoring {message}: {e}') + _logger.debug( + "network_consumer reader at EOF, cannot continue, closing connection" + ) + except Exception as e: + _logger.error( + "network_consumer reader exception {}", e + ) + finally: + await self.close() async def _task_network_producer(self, tx_message_wait: float = 0.25): """Producer loop to transmit queued frames with an appropriate delay."""