From 7421acda02e0f1fc516b46db621ebd9eb7cb7108 Mon Sep 17 00:00:00 2001 From: David vonThenen <12752197+dvonthenen@users.noreply.github.com> Date: Tue, 11 Jun 2024 13:24:17 -0700 Subject: [PATCH] Implement Auto Flush --- deepgram/clients/live/v1/async_client.py | 250 +++++++++++++++-- deepgram/clients/live/v1/client.py | 256 ++++++++++++++++-- deepgram/options.py | 80 ++++-- .../auto_flush/async_microphone_mute/main.py | 174 ++++++++++++ .../auto_flush/microphone_mute/main.py | 148 ++++++++++ 5 files changed, 843 insertions(+), 65 deletions(-) create mode 100644 tests/edge_cases/auto_flush/async_microphone_mute/main.py create mode 100644 tests/edge_cases/auto_flush/microphone_mute/main.py diff --git a/deepgram/clients/live/v1/async_client.py b/deepgram/clients/live/v1/async_client.py index 36facb3d..7cba1049 100644 --- a/deepgram/clients/live/v1/async_client.py +++ b/deepgram/clients/live/v1/async_client.py @@ -5,6 +5,7 @@ import json import logging from typing import Dict, Union, Optional, cast, Any +from datetime import datetime import websockets from websockets.client import WebSocketClientProtocol @@ -28,6 +29,7 @@ from .options import LiveOptions ONE_SECOND = 1 +HALF_SECOND = 0.5 DEEPGRAM_INTERVAL = 5 PING_INTERVAL = 20 @@ -49,8 +51,12 @@ class AsyncLiveClient: # pylint: disable=too-many-instance-attributes _socket: WebSocketClientProtocol _event_handlers: Dict[LiveTranscriptionEvents, list] - _listen_thread: asyncio.Task - _keep_alive_thread: asyncio.Task + + _last_datagram: Optional[datetime] = None + + _listen_thread: Union[asyncio.Task, None] + _keep_alive_thread: Union[asyncio.Task, None] + _flush_thread: Union[asyncio.Task, None] _kwargs: Optional[Dict] = None _addons: Optional[Dict] = None @@ -67,7 +73,16 @@ def __init__(self, config: DeepgramClientOptions): self._config = config self._endpoint = "v1/listen" + + self._listen_thread = None + self._keep_alive_thread = None + self._flush_thread = None + + # exit self._exit_event = asyncio.Event() + + # auto flush + self._flush_event = asyncio.Event() self._event_handlers = { event: [] for event in LiveTranscriptionEvents.__members__.values() } @@ -112,7 +127,7 @@ async def start( if isinstance(options, LiveOptions): self._logger.info("LiveOptions switching class -> dict") - self._options = cast(Dict[str, str], options.to_dict()) + self._options = options.to_dict() elif options is not None: self._options = options else: @@ -146,12 +161,19 @@ async def start( self._listen_thread = asyncio.create_task(self._listening()) # keepalive thread - if self._config.options.get("keepalive") == "true": + if self._config.is_keep_alive_enabled(): self._logger.notice("keepalive is enabled") self._keep_alive_thread = asyncio.create_task(self._keep_alive()) else: self._logger.notice("keepalive is disabled") + # flush thread + if self._config.is_auto_flush_enabled(): + self._logger.notice("autoflush is enabled") + self._flush_thread = asyncio.create_task(self._flush()) + else: + self._logger.notice("autoflush is disabled") + # push open event await self._emit( LiveTranscriptionEvents(LiveTranscriptionEvents.Open), @@ -186,7 +208,7 @@ def on(self, event: LiveTranscriptionEvents, handler) -> None: """ Registers event handlers for specific events. """ - self._logger.info("event fired: %s", event) + self._logger.info("event subscribed: %s", event) if event in LiveTranscriptionEvents.__members__.values() and callable(handler): self._event_handlers[event].append(handler) @@ -195,13 +217,14 @@ async def _emit(self, event: LiveTranscriptionEvents, *args, **kwargs) -> None: """ Emits events to the registered event handlers. """ + self._logger.debug("callback handlers for: %s", event) for handler in self._event_handlers[event]: if asyncio.iscoroutinefunction(handler): await handler(self, *args, **kwargs) else: asyncio.create_task(handler(self, *args, **kwargs)) - # pylint: disable=too-many-return-statements,too-many-statements,too-many-locals + # pylint: disable=too-many-return-statements,too-many-statements,too-many-locals,too-many-branches async def _listening(self) -> None: """ Listens for messages from the WebSocket connection. @@ -244,6 +267,13 @@ async def _listening(self) -> None: message ) self._logger.verbose("LiveResultResponse: %s", msg_result) + + # auto flush + if self._config.is_inspecting_messages(): + inspect_res = await self._inspect(msg_result) + if not inspect_res: + self._logger.error("inspect_res failed") + await self._emit( LiveTranscriptionEvents(LiveTranscriptionEvents.Transcript), result=msg_result, @@ -426,8 +456,7 @@ async def _keep_alive(self) -> None: # deepgram keepalive if counter % DEEPGRAM_INTERVAL == 0: - self._logger.verbose("Sending KeepAlive...") - await self.send(json.dumps({"type": "KeepAlive"})) + await self.keep_alive() except websockets.exceptions.ConnectionClosedOK as e: self._logger.notice(f"_keep_alive({e.code}) exiting gracefully") @@ -514,6 +543,132 @@ async def _keep_alive(self) -> None: raise return + ## pylint: disable=too-many-return-statements,too-many-statements + async def _flush(self) -> None: + self._logger.debug("AsyncLiveClient._flush ENTER") + + delta_in_ms_str = self._config.options.get("auto_flush_reply_delta") + if delta_in_ms_str is None: + self._logger.error("auto_flush_reply_delta is None") + self._logger.debug("AsyncLiveClient._flush LEAVE") + return + delta_in_ms = float(delta_in_ms_str) + + while True: + try: + await asyncio.sleep(HALF_SECOND) + + if self._exit_event.is_set(): + self._logger.notice("_flush exiting gracefully") + self._logger.debug("AsyncLiveClient._flush LEAVE") + return + + if self._socket is None: + self._logger.notice("socket is None, exiting flush") + self._logger.debug("AsyncLiveClient._flush LEAVE") + return + + if self._last_datagram is None: + self._logger.debug("AutoFlush last_datagram is None") + continue + + delta = datetime.now() - self._last_datagram + diff_in_ms = delta.total_seconds() * 1000 + self._logger.debug("AutoFlush delta: %f", diff_in_ms) + if diff_in_ms < delta_in_ms: + self._logger.debug("AutoFlush delta is less than threshold") + continue + + self._last_datagram = None + await self.finalize() + + except websockets.exceptions.ConnectionClosedOK as e: + self._logger.notice(f"_flush({e.code}) exiting gracefully") + self._logger.debug("AsyncLiveClient._flush LEAVE") + return + + except websockets.exceptions.ConnectionClosed as e: + if e.code == 1000: + self._logger.notice(f"_flush({e.code}) exiting gracefully") + self._logger.debug("AsyncLiveClient._flush LEAVE") + return + + self._logger.error( + "ConnectionClosed in AsyncLiveClient._flush with code %s: %s", + e.code, + e.reason, + ) + cc_error: ErrorResponse = ErrorResponse( + "ConnectionClosed in AsyncLiveClient._flush", + f"{e}", + "ConnectionClosed", + ) + await self._emit( + LiveTranscriptionEvents(LiveTranscriptionEvents.Error), + error=cc_error, + **dict(cast(Dict[Any, Any], self._kwargs)), + ) + + # signal exit and close + await self._signal_exit() + + self._logger.debug("AsyncLiveClient._flush LEAVE") + + if self._config.options.get("termination_exception") == "true": + raise + return + + except websockets.exceptions.WebSocketException as e: + self._logger.error( + "WebSocketException in AsyncLiveClient._flush: %s", e + ) + ws_error: ErrorResponse = ErrorResponse( + "WebSocketException in AsyncLiveClient._flush", + f"{e}", + "Exception", + ) + await self._emit( + LiveTranscriptionEvents(LiveTranscriptionEvents.Error), + error=ws_error, + **dict(cast(Dict[Any, Any], self._kwargs)), + ) + + # signal exit and close + await self._signal_exit() + + self._logger.debug("AsyncLiveClient._flush LEAVE") + + if self._config.options.get("termination_exception") == "true": + raise + return + + except Exception as e: # pylint: disable=broad-except + self._logger.error("Exception in AsyncLiveClient._flush: %s", e) + e_error: ErrorResponse = ErrorResponse( + "Exception in AsyncLiveClient._flush", + f"{e}", + "Exception", + ) + self._logger.error("Exception in AsyncLiveClient._flush: %s", str(e)) + await self._emit( + LiveTranscriptionEvents(LiveTranscriptionEvents.Error), + error=e_error, + **dict(cast(Dict[Any, Any], self._kwargs)), + ) + + # signal exit and close + await self._signal_exit() + + self._logger.debug("AsyncLiveClient._flush LEAVE") + + if self._config.options.get("termination_exception") == "true": + raise + return + + # pylint: enable=too-many-return-statements + + # pylint: disable=too-many-return-statements + async def send(self, data: Union[str, bytes]) -> bool: """ Sends data over the WebSocket connection. @@ -570,6 +725,35 @@ async def send(self, data: Union[str, bytes]) -> bool: # pylint: enable=too-many-return-statements + async def keep_alive(self) -> bool: + """ + Sends a KeepAlive message + """ + self._logger.spam("AsyncLiveClient.keep_alive ENTER") + + if self._exit_event.is_set(): + self._logger.notice("keep_alive exiting gracefully") + self._logger.debug("AsyncLiveClient.keep_alive LEAVE") + return False + + if self._socket is None: + self._logger.notice("socket is not intialized") + self._logger.debug("AsyncLiveClient.keep_alive LEAVE") + return False + + self._logger.notice("Sending KeepAlive...") + ret = await self.send(json.dumps({"type": "KeepAlive"})) + + if not ret: + self._logger.error("keep_alive failed") + self._logger.spam("AsyncLiveClient.keep_alive LEAVE") + return False + + self._logger.notice("keep_alive succeeded") + self._logger.spam("AsyncLiveClient.keep_alive LEAVE") + + return True + async def finalize(self) -> bool: """ Finalizes the Transcript connection by flushing it @@ -581,14 +765,18 @@ async def finalize(self) -> bool: self._logger.debug("AsyncLiveClient.finalize LEAVE") return False - if self._socket is not None: - self._logger.notice("sending Finalize...") - ret = await self.send(json.dumps({"type": "Finalize"})) + if self._socket is None: + self._logger.notice("socket is not intialized") + self._logger.debug("AsyncLiveClient.finalize LEAVE") + return False - if not ret: - self._logger.error("finalize failed") - self._logger.spam("AsyncLiveClient.finalize LEAVE") - return False + self._logger.notice("Sending Finalize...") + ret = await self.send(json.dumps({"type": "Finalize"})) + + if not ret: + self._logger.error("finalize failed") + self._logger.spam("AsyncLiveClient.finalize LEAVE") + return False self._logger.notice("finalize succeeded") self._logger.spam("AsyncLiveClient.finalize LEAVE") @@ -609,13 +797,20 @@ async def finish(self) -> bool: try: # Before cancelling, check if the tasks were created tasks = [] - if self._config.options.get("keepalive") == "true": - if self._keep_alive_thread is not None: - self._keep_alive_thread.cancel() - tasks.append(self._keep_alive_thread) + if self._keep_alive_thread is not None: + self._keep_alive_thread.cancel() + tasks.append(self._keep_alive_thread) + self._logger.notice("processing _keep_alive_thread cancel...") + + if self._flush_thread is not None: + self._flush_thread.cancel() + tasks.append(self._flush_thread) + self._logger.notice("processing _flush_thread cancel...") + if self._listen_thread is not None: self._listen_thread.cancel() tasks.append(self._listen_thread) + self._logger.notice("processing _listen_thread cancel...") # Use asyncio.gather to wait for tasks to be cancelled await asyncio.gather(*filter(None, tasks), return_exceptions=True) @@ -673,3 +868,20 @@ async def _signal_exit(self) -> None: self._logger.error("socket.wait_closed failed: %s", e) self._socket = None # type: ignore + + async def _inspect(self, msg_result: LiveResultResponse) -> bool: + sentence = msg_result.channel.alternatives[0].transcript + if len(sentence) == 0: + return True + + if msg_result.is_final: + self._logger.debug("AutoFlush is_final received") + self._last_datagram = None + else: + self._last_datagram = datetime.now() + self._logger.debug( + "AutoFlush interim received: %s", + str(self._last_datagram), + ) + + return True diff --git a/deepgram/clients/live/v1/client.py b/deepgram/clients/live/v1/client.py index 8e554582..06ee031b 100644 --- a/deepgram/clients/live/v1/client.py +++ b/deepgram/clients/live/v1/client.py @@ -6,6 +6,7 @@ import time import logging from typing import Dict, Union, Optional, cast, Any +from datetime import datetime from websockets.sync.client import connect, ClientConnection import websockets @@ -29,6 +30,7 @@ from .options import LiveOptions ONE_SECOND = 1 +HALF_SECOND = 0.5 DEEPGRAM_INTERVAL = 5 PING_INTERVAL = 20 @@ -51,9 +53,14 @@ class LiveClient: # pylint: disable=too-many-instance-attributes _socket: ClientConnection _exit_event: threading.Event _lock_send: threading.Lock + _lock_flush: threading.Lock _event_handlers: Dict[LiveTranscriptionEvents, list] - _listen_thread: threading.Thread - _keep_alive_thread: threading.Thread + + _last_datagram: Optional[datetime] = None + + _listen_thread: Union[threading.Thread, None] + _keep_alive_thread: Union[threading.Thread, None] + _flush_thread: Union[threading.Thread, None] _kwargs: Optional[Dict] = None _addons: Optional[Dict] = None @@ -70,8 +77,20 @@ def __init__(self, config: DeepgramClientOptions): self._config = config self._endpoint = "v1/listen" - self._exit_event = threading.Event() self._lock_send = threading.Lock() + + self._flush_thread = None + self._keep_alive_thread = None + self._listen_thread = None + + # exit + self._exit_event = threading.Event() + + # auto flush + self._last_datagram = None + self._flush_event = threading.Event() + self._lock_flush = threading.Lock() + self._event_handlers = { event: [] for event in LiveTranscriptionEvents.__members__.values() } @@ -122,7 +141,7 @@ def start( else: self._options = {} - combined_options: Dict = self._options + combined_options = self._options if self._addons is not None: self._logger.info("merging addons to options") combined_options.update(self._addons) @@ -146,13 +165,21 @@ def start( self._listen_thread.start() # keepalive thread - if self._config.options.get("keepalive") == "true": + if self._config.is_keep_alive_enabled(): self._logger.notice("keepalive is enabled") self._keep_alive_thread = threading.Thread(target=self._keep_alive) self._keep_alive_thread.start() else: self._logger.notice("keepalive is disabled") + # flush thread + if self._config.is_auto_flush_enabled(): + self._logger.notice("autoflush is enabled") + self._flush_thread = threading.Thread(target=self._flush) + self._flush_thread.start() + else: + self._logger.notice("autoflush is disabled") + # push open event self._emit( LiveTranscriptionEvents(LiveTranscriptionEvents.Open), @@ -189,7 +216,7 @@ def on( """ Registers event handlers for specific events. """ - self._logger.info("event fired: %s", event) + self._logger.info("event subscribed: %s", event) if event in LiveTranscriptionEvents.__members__.values() and callable(handler): self._event_handlers[event].append(handler) @@ -197,10 +224,11 @@ def _emit(self, event: LiveTranscriptionEvents, *args, **kwargs) -> None: """ Emits events to the registered event handlers. """ + self._logger.debug("callback handlers for: %s", event) for handler in self._event_handlers[event]: handler(self, *args, **kwargs) - # pylint: disable=too-many-return-statements,too-many-statements,too-many-locals + # pylint: disable=too-many-return-statements,too-many-statements,too-many-locals,too-many-branches def _listening( self, ) -> None: @@ -245,6 +273,13 @@ def _listening( message ) self._logger.verbose("LiveResultResponse: %s", msg_result) + + # auto flush + if self._config.is_inspecting_messages(): + inspect_res = self._inspect(msg_result) + if not inspect_res: + self._logger.error("inspect_res failed") + self._emit( LiveTranscriptionEvents(LiveTranscriptionEvents.Transcript), result=msg_result, @@ -405,8 +440,8 @@ def _keep_alive(self) -> None: while True: try: counter += 1 - self._exit_event.wait(timeout=ONE_SECOND) + if self._exit_event.is_set(): self._logger.notice("_keep_alive exiting gracefully") self._logger.debug("LiveClient._keep_alive LEAVE") @@ -419,8 +454,7 @@ def _keep_alive(self) -> None: # deepgram keepalive if counter % DEEPGRAM_INTERVAL == 0: - self._logger.verbose("Sending KeepAlive...") - self.send(json.dumps({"type": "KeepAlive"})) + self.keep_alive() except websockets.exceptions.ConnectionClosedOK as e: self._logger.notice(f"_keep_alive({e.code}) exiting gracefully") @@ -501,6 +535,126 @@ def _keep_alive(self) -> None: # pylint: enable=too-many-return-statements + ## pylint: disable=too-many-return-statements,too-many-statements + def _flush(self) -> None: + self._logger.debug("LiveClient._flush ENTER") + + delta_in_ms_str = self._config.options.get("auto_flush_reply_delta") + if delta_in_ms_str is None: + self._logger.error("auto_flush_reply_delta is None") + self._logger.debug("LiveClient._flush LEAVE") + return + delta_in_ms = float(delta_in_ms_str) + + while True: + try: + self._flush_event.wait(timeout=HALF_SECOND) + + if self._exit_event.is_set(): + self._logger.notice("_flush exiting gracefully") + self._logger.debug("LiveClient._flush LEAVE") + return + + if self._socket is None: + self._logger.debug("socket is None, exiting flush") + self._logger.debug("LiveClient._flush LEAVE") + return + + with self._lock_flush: + if self._last_datagram is None: + self._logger.debug("AutoFlush last_datagram is None") + continue + + delta = datetime.now() - self._last_datagram + diff_in_ms = delta.total_seconds() * 1000 + self._logger.debug("AutoFlush delta: %f", diff_in_ms) + if diff_in_ms < delta_in_ms: + self._logger.debug("AutoFlush delta is less than threshold") + continue + + with self._lock_flush: + self._last_datagram = None + self.finalize() + + except websockets.exceptions.ConnectionClosedOK as e: + self._logger.notice(f"_flush({e.code}) exiting gracefully") + self._logger.debug("LiveClient._flush LEAVE") + return + + except websockets.exceptions.ConnectionClosed as e: + if e.code == 1000: + self._logger.notice(f"_flush({e.code}) exiting gracefully") + self._logger.debug("LiveClient._flush LEAVE") + return + + self._logger.error( + "ConnectionClosed in LiveClient._flush with code %s: %s", + e.code, + e.reason, + ) + cc_error: ErrorResponse = ErrorResponse( + "ConnectionClosed in LiveClient._flush", + f"{e}", + "ConnectionClosed", + ) + self._emit( + LiveTranscriptionEvents(LiveTranscriptionEvents.Error), cc_error + ) + + # signal exit and close + self._signal_exit() + + self._logger.debug("LiveClient._flush LEAVE") + + if self._config.options.get("termination_exception") == "true": + raise + return + + except websockets.exceptions.WebSocketException as e: + self._logger.error( + "WebSocketException in LiveClient._flush with: %s", e + ) + ws_error: ErrorResponse = ErrorResponse( + "WebSocketException in LiveClient._flush", + f"{e}", + "WebSocketException", + ) + self._emit( + LiveTranscriptionEvents(LiveTranscriptionEvents.Error), ws_error + ) + + # signal exit and close + self._signal_exit() + + self._logger.debug("LiveClient._flush LEAVE") + + if self._config.options.get("termination_exception") == "true": + raise + return + + except Exception as e: # pylint: disable=broad-except + self._logger.error("Exception in LiveClient._flush: %s", e) + e_error: ErrorResponse = ErrorResponse( + "Exception in LiveClient._flush", + f"{e}", + "Exception", + ) + self._logger.error("Exception in LiveClient._flush: %s", str(e)) + self._emit( + LiveTranscriptionEvents(LiveTranscriptionEvents.Error), e_error + ) + + # signal exit and close + self._signal_exit() + + self._logger.debug("LiveClient._flush LEAVE") + + if self._config.options.get("termination_exception") == "true": + raise + return + + # pylint: enable=too-many-return-statements + # pylint: disable=too-many-return-statements def send(self, data: Union[str, bytes]) -> bool: """ @@ -561,6 +715,35 @@ def send(self, data: Union[str, bytes]) -> bool: # pylint: enable=too-many-return-statements + def keep_alive(self) -> bool: + """ + Sends a KeepAlive message + """ + self._logger.spam("LiveClient.keep_alive ENTER") + + if self._exit_event.is_set(): + self._logger.notice("keep_alive exiting gracefully") + self._logger.debug("LiveClient.keep_alive LEAVE") + return False + + if self._socket is None: + self._logger.notice("socket is not intialized") + self._logger.debug("LiveClient.keep_alive LEAVE") + return False + + self._logger.notice("Sending KeepAlive...") + ret = self.send(json.dumps({"type": "KeepAlive"})) + + if not ret: + self._logger.error("keep_alive failed") + self._logger.spam("LiveClient.keep_alive LEAVE") + return False + + self._logger.notice("keep_alive succeeded") + self._logger.spam("LiveClient.keep_alive LEAVE") + + return True + def finalize(self) -> bool: """ Finalizes the Transcript connection by flushing it @@ -572,14 +755,18 @@ def finalize(self) -> bool: self._logger.debug("LiveClient.finalize LEAVE") return False - if self._socket is not None: - self._logger.notice("sending Finalize...") - ret = self.send(json.dumps({"type": "Finalize"})) + if self._socket is None: + self._logger.notice("socket is not intialized") + self._logger.debug("LiveClient.finalize LEAVE") + return False + + self._logger.notice("Sending Finalize...") + ret = self.send(json.dumps({"type": "Finalize"})) - if not ret: - self._logger.error("finalize failed") - self._logger.spam("LiveClient.finalize LEAVE") - return False + if not ret: + self._logger.error("finalize failed") + self._logger.spam("LiveClient.finalize LEAVE") + return False self._logger.notice("finalize succeeded") self._logger.spam("LiveClient.finalize LEAVE") @@ -598,15 +785,19 @@ def finish(self) -> bool: # stop the threads self._logger.verbose("cancelling tasks...") - if self._config.options.get("keepalive") == "true": - if self._keep_alive_thread is not None: - self._keep_alive_thread.join() - self._keep_alive_thread = None # type: ignore - self._logger.notice("processing thread joined") + if self._flush_thread is not None: + self._flush_thread.join() + self._flush_thread = None + self._logger.notice("processing _flush_thread thread joined") + + if self._keep_alive_thread is not None: + self._keep_alive_thread.join() + self._keep_alive_thread = None + self._logger.notice("processing _keep_alive_thread thread joined") if self._listen_thread is not None: self._listen_thread.join() - self._listen_thread = None # type: ignore + self._listen_thread = None self._logger.notice("listening thread joined") self._logger.notice("finish succeeded") @@ -656,3 +847,22 @@ def _signal_exit(self) -> None: self._logger.error("socket.wait_closed failed: %s", e) self._socket = None # type: ignore + + def _inspect(self, msg_result: LiveResultResponse) -> bool: + sentence = msg_result.channel.alternatives[0].transcript + if len(sentence) == 0: + return True + + if msg_result.is_final: + with self._lock_flush: + self._logger.debug("AutoFlush is_final received") + self._last_datagram = None + else: + with self._lock_flush: + self._last_datagram = datetime.now() + self._logger.debug( + "AutoFlush interim received: %s", + str(self._last_datagram), + ) + + return True diff --git a/deepgram/options.py b/deepgram/options.py index 03a8b569..a75959e6 100644 --- a/deepgram/options.py +++ b/deepgram/options.py @@ -7,6 +7,7 @@ import os from typing import Dict, Optional import logging +import numbers from deepgram import __version__ from deepgram.utils import verboselogs @@ -27,6 +28,9 @@ class DeepgramClientOptions: options: (Optional) Additional options for initializing the client. """ + _logger: verboselogs.VerboseLogger + _inspect: bool + def __init__( self, api_key: str = "", @@ -35,14 +39,17 @@ def __init__( headers: Optional[Dict] = None, options: Optional[Dict] = None, ): - self.logger = verboselogs.VerboseLogger(__name__) - self.logger.addHandler(logging.StreamHandler()) + self._logger = verboselogs.VerboseLogger(__name__) + self._logger.addHandler(logging.StreamHandler()) if api_key is None: api_key = "" self.verbose = verbose self.api_key = api_key + + if headers is None: + headers = {} self._update_headers(headers=headers) if len(url) == 0: @@ -53,6 +60,10 @@ def __init__( options = {} self.options = options + self._inspect = False + if self.is_auto_flush_enabled(): + self._inspect = True + def set_apikey(self, api_key: str): """ set_apikey: Sets the API key for the client. @@ -63,7 +74,7 @@ def set_apikey(self, api_key: str): self.api_key = api_key self._update_headers() - def _get_url(self, url): + def _get_url(self, url) -> str: if not re.match(r"^https?://", url, re.IGNORECASE): url = "https://" + url return url.strip("/") @@ -82,6 +93,27 @@ def _update_headers(self, headers: Optional[Dict] = None): if headers: self.headers.update(headers) + def is_keep_alive_enabled(self) -> bool: + """ + is_keep_alive_enabled: Returns True if the client is configured to keep the connection alive. + """ + keep_alive_org = self.options.get("keepalive", False) + keep_alive_new = self.options.get("keep_alive", False) + return keep_alive_org or keep_alive_new + + def is_auto_flush_enabled(self) -> bool: + """ + is_auto_flush_enabled: Returns True if the client is configured to auto-flush. + """ + auto_flush_delta = float(self.options.get("auto_flush_reply_delta", 0)) + return isinstance(auto_flush_delta, numbers.Number) and auto_flush_delta > 0 + + def is_inspecting_messages(self) -> bool: + """ + is_inspecting_messages: Returns True if the client is inspecting messages. + """ + return self._inspect + class ClientOptionsFromEnv( DeepgramClientOptions @@ -90,6 +122,8 @@ class ClientOptionsFromEnv( This class extends DeepgramClientOptions and will attempt to use environment variables first before defaults. """ + _logger: verboselogs.VerboseLogger + def __init__( self, api_key: str = "", @@ -98,9 +132,9 @@ def __init__( headers: Optional[Dict] = None, options: Optional[Dict] = None, ): - self.logger = verboselogs.VerboseLogger(__name__) - self.logger.addHandler(logging.StreamHandler()) - self.logger.setLevel(verboselogs.WARNING) # temporary set for setup + self._logger = verboselogs.VerboseLogger(__name__) + self._logger.addHandler(logging.StreamHandler()) + self._logger.setLevel(verboselogs.WARNING) # temporary set for setup if api_key is None: api_key = "" @@ -108,12 +142,12 @@ def __init__( if api_key == "": api_key = os.getenv("DEEPGRAM_API_KEY", "") if api_key == "": - self.logger.critical("Deepgram API KEY is not set") + self._logger.critical("Deepgram API KEY is not set") raise DeepgramApiKeyError("Deepgram API KEY is not set") if url == "": url = os.getenv("DEEPGRAM_HOST", "api.deepgram.com") - self.logger.notice(f"Deepgram host is set to {url}") + self._logger.notice(f"Deepgram host is set to {url}") if verbose == verboselogs.WARNING: _loglevel = os.getenv("DEEPGRAM_LOGGING", "") @@ -122,36 +156,36 @@ def __init__( if isinstance(verbose, str): match verbose: case "NOTSET": - self.logger.notice("Logging level is set to NOTSET") + self._logger.notice("Logging level is set to NOTSET") verbose = verboselogs.NOTSET case "SPAM": - self.logger.notice("Logging level is set to SPAM") + self._logger.notice("Logging level is set to SPAM") verbose = verboselogs.SPAM case "DEBUG": - self.logger.notice("Logging level is set to DEBUG") + self._logger.notice("Logging level is set to DEBUG") verbose = verboselogs.DEBUG case "VERBOSE": - self.logger.notice("Logging level is set to VERBOSE") + self._logger.notice("Logging level is set to VERBOSE") verbose = verboselogs.VERBOSE case "NOTICE": - self.logger.notice("Logging level is set to NOTICE") + self._logger.notice("Logging level is set to NOTICE") verbose = verboselogs.NOTICE case "WARNING": - self.logger.notice("Logging level is set to WARNING") + self._logger.notice("Logging level is set to WARNING") verbose = verboselogs.WARNING case "SUCCESS": - self.logger.notice("Logging level is set to SUCCESS") + self._logger.notice("Logging level is set to SUCCESS") verbose = verboselogs.SUCCESS case "ERROR": - self.logger.notice("Logging level is set to ERROR") + self._logger.notice("Logging level is set to ERROR") verbose = verboselogs.ERROR case "CRITICAL": - self.logger.notice("Logging level is set to CRITICAL") + self._logger.notice("Logging level is set to CRITICAL") verbose = verboselogs.CRITICAL case _: - self.logger.notice("Logging level is set to WARNING") + self._logger.notice("Logging level is set to WARNING") verbose = verboselogs.WARNING - self.logger.notice(f"Logging level is set to {verbose}") + self._logger.notice(f"Logging level is set to {verbose}") if headers is None: headers = {} @@ -159,7 +193,7 @@ def __init__( header = os.getenv(f"DEEPGRAM_HEADER_{x}", None) if header is not None: headers[header] = os.getenv(f"DEEPGRAM_HEADER_VALUE_{x}", None) - self.logger.debug( + self._logger.debug( "Deepgram header %s is set with value %s", header, headers[header], @@ -167,7 +201,7 @@ def __init__( else: break if len(headers) == 0: - self.logger.notice("Deepgram headers are not set") + self._logger.notice("Deepgram headers are not set") headers = None if options is None: @@ -176,13 +210,13 @@ def __init__( param = os.getenv(f"DEEPGRAM_PARAM_{x}", None) if param is not None: options[param] = os.getenv(f"DEEPGRAM_PARAM_VALUE_{x}", None) - self.logger.debug( + self._logger.debug( "Deepgram option %s is set with value %s", param, options[param] ) else: break if len(options) == 0: - self.logger.notice("Deepgram options are not set") + self._logger.notice("Deepgram options are not set") options = None super().__init__( diff --git a/tests/edge_cases/auto_flush/async_microphone_mute/main.py b/tests/edge_cases/auto_flush/async_microphone_mute/main.py new file mode 100644 index 00000000..2793a426 --- /dev/null +++ b/tests/edge_cases/auto_flush/async_microphone_mute/main.py @@ -0,0 +1,174 @@ +# Copyright 2023-2024 Deepgram SDK contributors. All Rights Reserved. +# Use of this source code is governed by a MIT license that can be found in the LICENSE file. +# SPDX-License-Identifier: MIT + +from signal import SIGINT, SIGTERM +import asyncio +from dotenv import load_dotenv +import logging +from deepgram.utils import verboselogs +from time import sleep + +from deepgram import ( + DeepgramClient, + DeepgramClientOptions, + LiveTranscriptionEvents, + LiveOptions, + Microphone, +) + +load_dotenv() + +# We will collect the is_final=true messages here so we can use them when the person finishes speaking +is_finals = [] + + +async def main(): + try: + loop = asyncio.get_event_loop() + + for signal in (SIGTERM, SIGINT): + loop.add_signal_handler( + signal, + lambda: asyncio.create_task( + shutdown(signal, loop, dg_connection, microphone) + ), + ) + + # example of setting up a client config. logging values: WARNING, VERBOSE, DEBUG, SPAM + config = DeepgramClientOptions( + verbose=verboselogs.DEBUG, + options={ + "keepalive": "true", + "auto_flush_reply_delta": 2000, + }, + ) + deepgram: DeepgramClient = DeepgramClient("", config) + # otherwise, use default config + # deepgram: DeepgramClient = DeepgramClient() + + dg_connection = deepgram.listen.asynclive.v("1") + + async def on_open(self, open, **kwargs): + print(f"Connection Open") + + async def on_message(self, result, **kwargs): + global is_finals + sentence = result.channel.alternatives[0].transcript + if len(sentence) == 0: + return + if result.is_final: + # We need to collect these and concatenate them together when we get a speech_final=true + # See docs: https://developers.deepgram.com/docs/understand-endpointing-interim-results + is_finals.append(sentence) + + # Speech Final means we have detected sufficent silence to consider this end of speech + # Speech final is the lowest latency result as it triggers as soon an the endpointing value has triggered + if result.speech_final: + utterance = " ".join(is_finals) + print(f"Speech Final: {utterance}") + is_finals = [] + else: + # These are useful if you need real time captioning and update what the Interim Results produced + print(f"Is Final: {sentence}") + else: + # These are useful if you need real time captioning of what is being spoken + print(f"Interim Results: {sentence}") + + async def on_metadata(self, metadata, **kwargs): + print(f"Metadata: {metadata}") + + async def on_speech_started(self, speech_started, **kwargs): + print(f"Speech Started") + + async def on_utterance_end(self, utterance_end, **kwargs): + print(f"Utterance End") + global is_finals + if len(is_finals) > 0: + utterance = " ".join(is_finals) + print(f"Utterance End: {utterance}") + is_finals = [] + + async def on_close(self, close, **kwargs): + print(f"Connection Closed") + + async def on_error(self, error, **kwargs): + print(f"Handled Error: {error}") + + async def on_unhandled(self, unhandled, **kwargs): + print(f"Unhandled Websocket Message: {unhandled}") + + dg_connection.on(LiveTranscriptionEvents.Open, on_open) + dg_connection.on(LiveTranscriptionEvents.Transcript, on_message) + dg_connection.on(LiveTranscriptionEvents.Metadata, on_metadata) + dg_connection.on(LiveTranscriptionEvents.SpeechStarted, on_speech_started) + dg_connection.on(LiveTranscriptionEvents.UtteranceEnd, on_utterance_end) + dg_connection.on(LiveTranscriptionEvents.Close, on_close) + dg_connection.on(LiveTranscriptionEvents.Error, on_error) + dg_connection.on(LiveTranscriptionEvents.Unhandled, on_unhandled) + + # connect to websocket + options: LiveOptions = LiveOptions( + model="nova-2", + language="en-US", + # Apply smart formatting to the output + smart_format=True, + # Raw audio format deatils + encoding="linear16", + channels=1, + sample_rate=16000, + # To get UtteranceEnd, the following must be set: + interim_results=True, + utterance_end_ms="1000", + vad_events=True, + # Time in milliseconds of silence to wait for before finalizing speech + endpointing=300, + ) + + addons = { + # Prevent waiting for additional numbers + "no_delay": "true" + } + + print("\n\nStart talking! Press Ctrl+C to stop...\n") + if await dg_connection.start(options, addons=addons) is False: + print("Failed to connect to Deepgram") + return + + # Open a microphone stream on the default input device + microphone = Microphone(dg_connection.send) + + # start microphone + microphone.start() + + # wait until cancelled + try: + while True: + await asyncio.sleep(1) + except asyncio.CancelledError: + # This block will be executed when the shutdown coroutine cancels all tasks + pass + finally: + microphone.finish() + await dg_connection.finish() + + print("Finished") + + except Exception as e: + print(f"Could not open socket: {e}") + return + + +async def shutdown(signal, loop, dg_connection, microphone): + print(f"Received exit signal {signal.name}...") + microphone.finish() + await dg_connection.finish() + tasks = [t for t in asyncio.all_tasks() if t is not asyncio.current_task()] + [task.cancel() for task in tasks] + print(f"Cancelling {len(tasks)} outstanding tasks") + await asyncio.gather(*tasks, return_exceptions=True) + loop.stop() + print("Shutdown complete.") + + +asyncio.run(main()) diff --git a/tests/edge_cases/auto_flush/microphone_mute/main.py b/tests/edge_cases/auto_flush/microphone_mute/main.py new file mode 100644 index 00000000..44177be6 --- /dev/null +++ b/tests/edge_cases/auto_flush/microphone_mute/main.py @@ -0,0 +1,148 @@ +# Copyright 2023-2024 Deepgram SDK contributors. All Rights Reserved. +# Use of this source code is governed by a MIT license that can be found in the LICENSE file. +# SPDX-License-Identifier: MIT + +from dotenv import load_dotenv +import logging +from deepgram.utils import verboselogs +from time import sleep + +from deepgram import ( + DeepgramClient, + DeepgramClientOptions, + LiveTranscriptionEvents, + LiveOptions, + Microphone, +) + +load_dotenv() + +# We will collect the is_final=true messages here so we can use them when the person finishes speaking +is_finals = [] + + +def main(): + try: + # example of setting up a client config. logging values: WARNING, VERBOSE, DEBUG, SPAM + config = DeepgramClientOptions( + verbose=verboselogs.DEBUG, + options={ + "keepalive": "true", + "auto_flush_reply_delta": 2000, + }, + ) + deepgram: DeepgramClient = DeepgramClient("", config) + + dg_connection = deepgram.listen.live.v("1") + + def on_open(self, open, **kwargs): + print(f"Connection Open") + + def on_message(self, result, **kwargs): + global is_finals + sentence = result.channel.alternatives[0].transcript + if len(sentence) == 0: + return + if result.is_final: + # We need to collect these and concatenate them together when we get a speech_final=true + # See docs: https://developers.deepgram.com/docs/understand-endpointing-interim-results + is_finals.append(sentence) + + # Speech Final means we have detected sufficent silence to consider this end of speech + # Speech final is the lowest latency result as it triggers as soon an the endpointing value has triggered + if result.speech_final: + utterance = " ".join(is_finals) + print(f"Speech Final: {utterance}") + is_finals = [] + else: + # These are useful if you need real time captioning and update what the Interim Results produced + print(f"Is Final: {sentence}") + else: + # These are useful if you need real time captioning of what is being spoken + print(f"Interim Results: {sentence}") + + def on_metadata(self, metadata, **kwargs): + print(f"Metadata: {metadata}") + + def on_speech_started(self, speech_started, **kwargs): + print(f"Speech Started") + + def on_utterance_end(self, utterance_end, **kwargs): + print(f"Utterance End") + global is_finals + if len(is_finals) > 0: + utterance = " ".join(is_finals) + print(f"Utterance End: {utterance}") + is_finals = [] + + def on_close(self, close, **kwargs): + print(f"Connection Closed") + + def on_error(self, error, **kwargs): + print(f"Handled Error: {error}") + + def on_unhandled(self, unhandled, **kwargs): + print(f"Unhandled Websocket Message: {unhandled}") + + dg_connection.on(LiveTranscriptionEvents.Open, on_open) + dg_connection.on(LiveTranscriptionEvents.Transcript, on_message) + dg_connection.on(LiveTranscriptionEvents.Metadata, on_metadata) + dg_connection.on(LiveTranscriptionEvents.SpeechStarted, on_speech_started) + dg_connection.on(LiveTranscriptionEvents.UtteranceEnd, on_utterance_end) + dg_connection.on(LiveTranscriptionEvents.Close, on_close) + dg_connection.on(LiveTranscriptionEvents.Error, on_error) + dg_connection.on(LiveTranscriptionEvents.Unhandled, on_unhandled) + + options: LiveOptions = LiveOptions( + model="nova-2", + language="en-US", + # Apply smart formatting to the output + smart_format=True, + # Raw audio format details + encoding="linear16", + channels=1, + sample_rate=16000, + # To get UtteranceEnd, the following must be set: + interim_results=True, + utterance_end_ms="1000", + vad_events=True, + # Time in milliseconds of silence to wait for before finalizing speech + endpointing=300, + ) + + addons = { + # Prevent waiting for additional numbers + "no_delay": "true" + } + + print("\n\nPress Enter to stop recording...\n\n") + if dg_connection.start(options, addons=addons) is False: + print("Failed to connect to Deepgram") + return + + # Open a microphone stream on the default input device + microphone = Microphone(dg_connection.send) + + # start microphone + microphone.start() + + # wait until finished + input("") + + # Wait for the microphone to close + microphone.finish() + + # Indicate that we've finished + dg_connection.finish() + + print("Finished") + # sleep(30) # wait 30 seconds to see if there is any additional socket activity + # print("Really done!") + + except Exception as e: + print(f"Could not open socket: {e}") + return + + +if __name__ == "__main__": + main()