Skip to content

Commit

Permalink
Thread Tracing
Browse files Browse the repository at this point in the history
  • Loading branch information
davidvonthenen committed Jun 12, 2024
1 parent 3177017 commit f95d859
Show file tree
Hide file tree
Showing 2 changed files with 68 additions and 11 deletions.
57 changes: 47 additions & 10 deletions deepgram/clients/live/v1/async_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import json
import logging
from typing import Dict, Union, Optional, cast, Any
import threading

import websockets
from websockets.client import WebSocketClientProtocol
Expand Down Expand Up @@ -49,8 +50,8 @@ class AsyncLiveClient: # pylint: disable=too-many-instance-attributes

_socket: WebSocketClientProtocol
_event_handlers: Dict[LiveTranscriptionEvents, list]
_listen_thread: asyncio.Task
_keep_alive_thread: asyncio.Task
_listen_thread: Union[asyncio.Task, None]
_keep_alive_thread: Union[asyncio.Task, None]

_kwargs: Optional[Dict] = None
_addons: Optional[Dict] = None
Expand Down Expand Up @@ -142,6 +143,11 @@ async def start(
)
self._exit_event.clear()

# debug the threads
for thread in threading.enumerate():
self._logger.debug("after running thread: %s", thread.name)
self._logger.debug("number of active threads: %s", threading.active_count())

# listen thread
self._listen_thread = asyncio.create_task(self._listening())

Expand All @@ -152,6 +158,11 @@ async def start(
else:
self._logger.notice("keepalive is disabled")

# debug the threads
for thread in threading.enumerate():
self._logger.debug("after running thread: %s", thread.name)
self._logger.debug("number of active threads: %s", threading.active_count())

# push open event
await self._emit(
LiveTranscriptionEvents(LiveTranscriptionEvents.Open),
Expand Down Expand Up @@ -195,11 +206,28 @@ async def _emit(self, event: LiveTranscriptionEvents, *args, **kwargs) -> None:
"""
Emits events to the registered event handlers.
"""

self._logger.debug("AsyncLiveClient._emit ENTER")

# debug the threads
for thread in threading.enumerate():
self._logger.debug("after running thread: %s", thread.name)
self._logger.debug("number of active threads: %s", threading.active_count())

tasks = []
for handler in self._event_handlers[event]:
if asyncio.iscoroutinefunction(handler):
await handler(self, *args, **kwargs)
else:
asyncio.create_task(handler(self, *args, **kwargs))
tasks.append(asyncio.create_task(handler(self, *args, **kwargs)))

if len(tasks) > 0:
self._logger.debug("waiting for tasks to finish...")
await asyncio.gather(*filter(None, tasks), return_exceptions=True)

# debug the threads
for thread in threading.enumerate():
self._logger.debug("after running thread: %s", thread.name)
self._logger.debug("number of active threads: %s", threading.active_count())

self._logger.debug("AsyncLiveClient._emit LEAVE")

# pylint: disable=too-many-return-statements,too-many-statements,too-many-locals
async def _listening(self) -> None:
Expand Down Expand Up @@ -608,11 +636,15 @@ async def finish(self) -> bool:
self._logger.verbose("cancelling tasks...")
try:
# Before cancelling, check if the tasks were created
# debug the threads
for thread in threading.enumerate():
self._logger.debug("before running thread: %s", thread.name)
self._logger.debug("number of active threads: %s", threading.active_count())

tasks = []
if self._config.options.get("keepalive") == "true":
if self._keep_alive_thread is not None:
self._keep_alive_thread.cancel()
tasks.append(self._keep_alive_thread)
if self._keep_alive_thread is not None:
self._keep_alive_thread.cancel()
tasks.append(self._keep_alive_thread)
if self._listen_thread is not None:
self._listen_thread.cancel()
tasks.append(self._listen_thread)
Expand All @@ -621,6 +653,11 @@ async def finish(self) -> bool:
await asyncio.gather(*filter(None, tasks), return_exceptions=True)
self._logger.notice("threads joined")

# debug the threads
for thread in threading.enumerate():
self._logger.debug("after running thread: %s", thread.name)
self._logger.debug("number of active threads: %s", threading.active_count())

self._logger.notice("finish succeeded")
self._logger.spam("AsyncLiveClient.finish LEAVE")
return True
Expand Down
22 changes: 21 additions & 1 deletion deepgram/clients/live/v1/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,10 @@
# Use of this source code is governed by a MIT license that can be found in the LICENSE file.
# SPDX-License-Identifier: MIT
import json
import threading
import time
import logging
from typing import Dict, Union, Optional, cast, Any
import threading

from websockets.sync.client import connect, ClientConnection
import websockets
Expand Down Expand Up @@ -141,6 +141,11 @@ def start(
self._socket = connect(url_with_params, additional_headers=combined_headers)
self._exit_event.clear()

# debug the threads
for thread in threading.enumerate():
self._logger.debug("after running thread: %s", thread.name)
self._logger.debug("number of active threads: %s", threading.active_count())

# listening thread
self._listen_thread = threading.Thread(target=self._listening)
self._listen_thread.start()
Expand All @@ -153,6 +158,11 @@ def start(
else:
self._logger.notice("keepalive is disabled")

# debug the threads
for thread in threading.enumerate():
self._logger.debug("after running thread: %s", thread.name)
self._logger.debug("number of active threads: %s", threading.active_count())

# push open event
self._emit(
LiveTranscriptionEvents(LiveTranscriptionEvents.Open),
Expand Down Expand Up @@ -593,6 +603,11 @@ def finish(self) -> bool:
"""
self._logger.spam("LiveClient.finish ENTER")

# debug the threads
for thread in threading.enumerate():
self._logger.debug("before running thread: %s", thread.name)
self._logger.debug("number of active threads: %s", threading.active_count())

# signal exit
self._signal_exit()

Expand All @@ -609,6 +624,11 @@ def finish(self) -> bool:
self._listen_thread = None # type: ignore
self._logger.notice("listening thread joined")

# debug the threads
for thread in threading.enumerate():
self._logger.debug("before running thread: %s", thread.name)
self._logger.debug("number of active threads: %s", threading.active_count())

self._logger.notice("finish succeeded")
self._logger.spam("LiveClient.finish LEAVE")
return True
Expand Down

0 comments on commit f95d859

Please sign in to comment.