diff --git a/deepgram/clients/live/v1/async_client.py b/deepgram/clients/live/v1/async_client.py index 36facb3d..2cba8a48 100644 --- a/deepgram/clients/live/v1/async_client.py +++ b/deepgram/clients/live/v1/async_client.py @@ -5,6 +5,7 @@ import json import logging from typing import Dict, Union, Optional, cast, Any +import threading import websockets from websockets.client import WebSocketClientProtocol @@ -49,8 +50,8 @@ class AsyncLiveClient: # pylint: disable=too-many-instance-attributes _socket: WebSocketClientProtocol _event_handlers: Dict[LiveTranscriptionEvents, list] - _listen_thread: asyncio.Task - _keep_alive_thread: asyncio.Task + _listen_thread: Union[asyncio.Task, None] + _keep_alive_thread: Union[asyncio.Task, None] _kwargs: Optional[Dict] = None _addons: Optional[Dict] = None @@ -142,6 +143,11 @@ async def start( ) self._exit_event.clear() + # debug the threads + for thread in threading.enumerate(): + self._logger.debug("after running thread: %s", thread.name) + self._logger.debug("number of active threads: %s", threading.active_count()) + # listen thread self._listen_thread = asyncio.create_task(self._listening()) @@ -152,6 +158,11 @@ async def start( else: self._logger.notice("keepalive is disabled") + # debug the threads + for thread in threading.enumerate(): + self._logger.debug("after running thread: %s", thread.name) + self._logger.debug("number of active threads: %s", threading.active_count()) + # push open event await self._emit( LiveTranscriptionEvents(LiveTranscriptionEvents.Open), @@ -195,11 +206,28 @@ async def _emit(self, event: LiveTranscriptionEvents, *args, **kwargs) -> None: """ Emits events to the registered event handlers. """ + + self._logger.debug("AsyncLiveClient._emit ENTER") + + # debug the threads + for thread in threading.enumerate(): + self._logger.debug("after running thread: %s", thread.name) + self._logger.debug("number of active threads: %s", threading.active_count()) + + tasks = [] for handler in self._event_handlers[event]: - if asyncio.iscoroutinefunction(handler): - await handler(self, *args, **kwargs) - else: - asyncio.create_task(handler(self, *args, **kwargs)) + tasks.append(asyncio.create_task(handler(self, *args, **kwargs))) + + if len(tasks) > 0: + self._logger.debug("waiting for tasks to finish...") + await asyncio.gather(*filter(None, tasks), return_exceptions=True) + + # debug the threads + for thread in threading.enumerate(): + self._logger.debug("after running thread: %s", thread.name) + self._logger.debug("number of active threads: %s", threading.active_count()) + + self._logger.debug("AsyncLiveClient._emit LEAVE") # pylint: disable=too-many-return-statements,too-many-statements,too-many-locals async def _listening(self) -> None: @@ -608,11 +636,15 @@ async def finish(self) -> bool: self._logger.verbose("cancelling tasks...") try: # Before cancelling, check if the tasks were created + # debug the threads + for thread in threading.enumerate(): + self._logger.debug("before running thread: %s", thread.name) + self._logger.debug("number of active threads: %s", threading.active_count()) + tasks = [] - if self._config.options.get("keepalive") == "true": - if self._keep_alive_thread is not None: - self._keep_alive_thread.cancel() - tasks.append(self._keep_alive_thread) + if self._keep_alive_thread is not None: + self._keep_alive_thread.cancel() + tasks.append(self._keep_alive_thread) if self._listen_thread is not None: self._listen_thread.cancel() tasks.append(self._listen_thread) @@ -621,6 +653,11 @@ async def finish(self) -> bool: await asyncio.gather(*filter(None, tasks), return_exceptions=True) self._logger.notice("threads joined") + # debug the threads + for thread in threading.enumerate(): + self._logger.debug("after running thread: %s", thread.name) + self._logger.debug("number of active threads: %s", threading.active_count()) + self._logger.notice("finish succeeded") self._logger.spam("AsyncLiveClient.finish LEAVE") return True diff --git a/deepgram/clients/live/v1/client.py b/deepgram/clients/live/v1/client.py index 8e554582..46f39b45 100644 --- a/deepgram/clients/live/v1/client.py +++ b/deepgram/clients/live/v1/client.py @@ -2,10 +2,10 @@ # Use of this source code is governed by a MIT license that can be found in the LICENSE file. # SPDX-License-Identifier: MIT import json -import threading import time import logging from typing import Dict, Union, Optional, cast, Any +import threading from websockets.sync.client import connect, ClientConnection import websockets @@ -141,6 +141,11 @@ def start( self._socket = connect(url_with_params, additional_headers=combined_headers) self._exit_event.clear() + # debug the threads + for thread in threading.enumerate(): + self._logger.debug("after running thread: %s", thread.name) + self._logger.debug("number of active threads: %s", threading.active_count()) + # listening thread self._listen_thread = threading.Thread(target=self._listening) self._listen_thread.start() @@ -153,6 +158,11 @@ def start( else: self._logger.notice("keepalive is disabled") + # debug the threads + for thread in threading.enumerate(): + self._logger.debug("after running thread: %s", thread.name) + self._logger.debug("number of active threads: %s", threading.active_count()) + # push open event self._emit( LiveTranscriptionEvents(LiveTranscriptionEvents.Open), @@ -593,6 +603,11 @@ def finish(self) -> bool: """ self._logger.spam("LiveClient.finish ENTER") + # debug the threads + for thread in threading.enumerate(): + self._logger.debug("before running thread: %s", thread.name) + self._logger.debug("number of active threads: %s", threading.active_count()) + # signal exit self._signal_exit() @@ -609,6 +624,11 @@ def finish(self) -> bool: self._listen_thread = None # type: ignore self._logger.notice("listening thread joined") + # debug the threads + for thread in threading.enumerate(): + self._logger.debug("before running thread: %s", thread.name) + self._logger.debug("number of active threads: %s", threading.active_count()) + self._logger.notice("finish succeeded") self._logger.spam("LiveClient.finish LEAVE") return True