Skip to content

Commit

Permalink
Handle exceptions in client._task_network_consumer
Browse files Browse the repository at this point in the history
When exceptions bubble up to _task_network_consumer, log them
and close the connection.

This prevents the network consumer becoming unresponsive
in the face of unhandled exceptions, which then surface as
TimeoutErrors to GivTCP processes which are waiting on
responses from the inverter.
  • Loading branch information
willholley committed Oct 16, 2024
1 parent fe07cd5 commit 352fe9a
Showing 1 changed file with 47 additions and 41 deletions.
88 changes: 47 additions & 41 deletions GivTCP/givenergy_modbus_async/client/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -328,47 +328,53 @@ async def one_shot_command(

async def _task_network_consumer(self):
"""Task for orchestrating incoming data."""
while hasattr(self, "reader") and self.reader and not self.reader.at_eof():
frame = await self.reader.read(300)
# await self.debug_frames['all'].put(frame)
for message in self.framer.decode(frame):
_logger.debug("Processing %s", message)
if isinstance(message, ExceptionBase):
_logger.warning(
"Expected response never arrived but resulted in exception: %s",
message,
)
continue
if isinstance(message, HeartbeatRequest):
_logger.debug("Responding to HeartbeatRequest")
await self.tx_queue.put(
(message.expected_response().encode(), None)
)
continue
if not isinstance(message, TransparentResponse):
_logger.warning(
"Received unexpected message type for a client: %s", message
)
continue
if isinstance(message, WriteHoldingRegisterResponse):
if message.error:
_logger.warning("%s", message)
else:
_logger.info("%s", message)

future = self.expected_responses.get(message.shape_hash())

if future and not future.done():
future.set_result(message)
# try:
self.plant.update(message)
# except RegisterCacheUpdateFailed as e:
# # await self.debug_frames['error'].put(frame)
# _logger.debug(f'Ignoring {message}: {e}')
_logger.debug(
"network_consumer reader at EOF, cannot continue, closing connection"
)
await self.close()
try:
while hasattr(self, "reader") and self.reader and not self.reader.at_eof():
frame = await self.reader.read(300)
# await self.debug_frames['all'].put(frame)
for message in self.framer.decode(frame):
_logger.debug("Processing %s", message)
if isinstance(message, ExceptionBase):
_logger.warning(
"Expected response never arrived but resulted in exception: %s",
message,
)
continue
if isinstance(message, HeartbeatRequest):
_logger.debug("Responding to HeartbeatRequest")
await self.tx_queue.put(
(message.expected_response().encode(), None)
)
continue
if not isinstance(message, TransparentResponse):
_logger.warning(
"Received unexpected message type for a client: %s", message
)
continue
if isinstance(message, WriteHoldingRegisterResponse):
if message.error:
_logger.warning("%s", message)
else:
_logger.info("%s", message)

future = self.expected_responses.get(message.shape_hash())

if future and not future.done():
future.set_result(message)
# try:
self.plant.update(message)
# except RegisterCacheUpdateFailed as e:
# # await self.debug_frames['error'].put(frame)
# _logger.debug(f'Ignoring {message}: {e}')
_logger.debug(
"network_consumer reader at EOF, cannot continue, closing connection"
)
except Exception as e:
_logger.error(
"network_consumer reader exception {}", e
)
finally:
await self.close()

async def _task_network_producer(self, tx_message_wait: float = 0.25):
"""Producer loop to transmit queued frames with an appropriate delay."""
Expand Down

0 comments on commit 352fe9a

Please sign in to comment.