Skip to content

Commit

Permalink
Merge pull request #372 from dvonthenen/dont-set-obj-threads-to-none
Browse files Browse the repository at this point in the history
Don't Set Thread References to None
  • Loading branch information
davidvonthenen authored Apr 24, 2024
2 parents 10204e8 + 9d7f800 commit b52c70d
Show file tree
Hide file tree
Showing 2 changed files with 38 additions and 6 deletions.
21 changes: 18 additions & 3 deletions deepgram/clients/live/v1/async_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -283,6 +283,11 @@ async def _listening(self) -> None:
return

except websockets.exceptions.WebSocketException as e:
if e.code == 1000:
self.logger.notice(f"_listening({e.code}) exiting gracefully")
self.logger.debug("AsyncLiveClient._listening LEAVE")
return

self.logger.error(
"WebSocketException in AsyncLiveClient._listening: %s", e
)
Expand Down Expand Up @@ -357,6 +362,11 @@ async def _keep_alive(self) -> None:
return

except websockets.exceptions.WebSocketException as e:
if e.code == 1000:
self.logger.notice(f"_keep_alive({e.code}) exiting gracefully")
self.logger.debug("AsyncLiveClient._keep_alive LEAVE")
return

self.logger.error(
"WebSocketException in AsyncLiveClient._keep_alive: %s", e
)
Expand Down Expand Up @@ -419,11 +429,18 @@ async def send(self, data: Union[str, bytes]) -> bool:
await self._socket.send(data)
except websockets.exceptions.ConnectionClosedOK as e:
self.logger.notice(f"send() exiting gracefully: {e.code}")
self.logger.debug("AsyncLiveClient._keep_alive LEAVE")
self.logger.debug("AsyncLiveClient.send LEAVE")
if self.config.options.get("termination_exception_send") == "true":
raise
return True
except websockets.exceptions.WebSocketException as e:
if e.code == 1000:
self.logger.notice(f"send({e.code}) exiting gracefully")
self.logger.debug("AsyncLiveClient.send LEAVE")
if self.config.options.get("termination_exception_send") == "true":
raise
return True

self.logger.error("send() failed - WebSocketException: %s", str(e))
self.logger.spam("AsyncLiveClient.send LEAVE")
if self.config.options.get("termination_exception_send") == "true":
Expand Down Expand Up @@ -469,8 +486,6 @@ async def finish(self) -> bool:
# Use asyncio.gather to wait for tasks to be cancelled
await asyncio.gather(*filter(None, tasks), return_exceptions=True)
self.logger.notice("threads joined")
self._listen_thread = None
self._keep_alive_thread = None

self._socket = None

Expand Down
23 changes: 20 additions & 3 deletions deepgram/clients/live/v1/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,7 @@ def start(

# keepalive thread
if self.config.options.get("keepalive") == "true":
self.logger.notice("keepalive is disabled")
self.logger.notice("keepalive is enabled")
self._keep_alive_thread = threading.Thread(target=self._keep_alive)
self._keep_alive_thread.start()
else:
Expand Down Expand Up @@ -283,6 +283,11 @@ def _listening(self) -> None:
return

except websockets.exceptions.WebSocketException as e:
if e.code == 1000:
self.logger.notice(f"_listening({e.code}) exiting gracefully")
self.logger.debug("LiveClient._listening LEAVE")
return

self.logger.error(
"WebSocketException in AsyncLiveClient._listening: %s", e
)
Expand Down Expand Up @@ -357,6 +362,11 @@ def _keep_alive(self) -> None:
return

except websockets.exceptions.WebSocketException as e:
if e.code == 1000:
self.logger.notice(f"_keep_alive({e.code}) exiting gracefully")
self.logger.debug("LiveClient._keep_alive LEAVE")
return

self.logger.error(
"WebSocketException in AsyncLiveClient._keep_alive: %s", e
)
Expand Down Expand Up @@ -423,6 +433,15 @@ def send(self, data: Union[str, bytes]) -> bool:
raise
return True
except websockets.exceptions.WebSocketException as e:
if e.code == 1000:
self.logger.notice(f"send({e.code}) exiting gracefully")
self.logger.debug("LiveClient.send LEAVE")
if (
self.config.options.get("termination_exception_send")
== "true"
):
raise
return True
self.logger.error("send() failed - WebSocketException: %s", str(e))
self.logger.spam("LiveClient.send LEAVE")
if self.config.options.get("termination_exception_send") == "true":
Expand Down Expand Up @@ -457,12 +476,10 @@ def finish(self) -> bool:
self.logger.verbose("cancelling tasks...")
if self._keep_alive_thread is not None:
self._keep_alive_thread.join()
self._keep_alive_thread = None
self.logger.notice("processing thread joined")

if self._listen_thread is not None:
self._listen_thread.join()
self._listen_thread = None
self.logger.notice("listening thread joined")

self._socket = None
Expand Down

0 comments on commit b52c70d

Please sign in to comment.