diff --git a/GivTCP/givenergy_modbus_async/client/client.py b/GivTCP/givenergy_modbus_async/client/client.py index fa84b50..a0298f2 100644 --- a/GivTCP/givenergy_modbus_async/client/client.py +++ b/GivTCP/givenergy_modbus_async/client/client.py @@ -328,47 +328,53 @@ async def one_shot_command( async def _task_network_consumer(self): """Task for orchestrating incoming data.""" - while hasattr(self, "reader") and self.reader and not self.reader.at_eof(): - frame = await self.reader.read(300) - # await self.debug_frames['all'].put(frame) - for message in self.framer.decode(frame): - _logger.debug("Processing %s", message) - if isinstance(message, ExceptionBase): - _logger.warning( - "Expected response never arrived but resulted in exception: %s", - message, - ) - continue - if isinstance(message, HeartbeatRequest): - _logger.debug("Responding to HeartbeatRequest") - await self.tx_queue.put( - (message.expected_response().encode(), None) - ) - continue - if not isinstance(message, TransparentResponse): - _logger.warning( - "Received unexpected message type for a client: %s", message - ) - continue - if isinstance(message, WriteHoldingRegisterResponse): - if message.error: - _logger.warning("%s", message) - else: - _logger.info("%s", message) - - future = self.expected_responses.get(message.shape_hash()) - - if future and not future.done(): - future.set_result(message) - # try: - self.plant.update(message) - # except RegisterCacheUpdateFailed as e: - # # await self.debug_frames['error'].put(frame) - # _logger.debug(f'Ignoring {message}: {e}') - _logger.debug( - "network_consumer reader at EOF, cannot continue, closing connection" - ) - await self.close() + try: + while hasattr(self, "reader") and self.reader and not self.reader.at_eof(): + frame = await self.reader.read(300) + # await self.debug_frames['all'].put(frame) + for message in self.framer.decode(frame): + _logger.debug("Processing %s", message) + if isinstance(message, ExceptionBase): + _logger.warning( + "Expected response never arrived but resulted in exception: %s", + message, + ) + continue + if isinstance(message, HeartbeatRequest): + _logger.debug("Responding to HeartbeatRequest") + await self.tx_queue.put( + (message.expected_response().encode(), None) + ) + continue + if not isinstance(message, TransparentResponse): + _logger.warning( + "Received unexpected message type for a client: %s", message + ) + continue + if isinstance(message, WriteHoldingRegisterResponse): + if message.error: + _logger.warning("%s", message) + else: + _logger.info("%s", message) + + future = self.expected_responses.get(message.shape_hash()) + + if future and not future.done(): + future.set_result(message) + # try: + self.plant.update(message) + # except RegisterCacheUpdateFailed as e: + # # await self.debug_frames['error'].put(frame) + # _logger.debug(f'Ignoring {message}: {e}') + _logger.debug( + "network_consumer reader at EOF, cannot continue, closing connection" + ) + except Exception as e: + _logger.error( + "network_consumer reader exception {}", e + ) + finally: + await self.close() async def _task_network_producer(self, tx_message_wait: float = 0.25): """Producer loop to transmit queued frames with an appropriate delay."""