diff --git a/deepgram/clients/live/v1/async_client.py b/deepgram/clients/live/v1/async_client.py index 36facb3d..52d6501b 100644 --- a/deepgram/clients/live/v1/async_client.py +++ b/deepgram/clients/live/v1/async_client.py @@ -5,6 +5,7 @@ import json import logging from typing import Dict, Union, Optional, cast, Any +from datetime import datetime import websockets from websockets.client import WebSocketClientProtocol @@ -28,6 +29,7 @@ from .options import LiveOptions ONE_SECOND = 1 +HALF_SECOND = 0.5 DEEPGRAM_INTERVAL = 5 PING_INTERVAL = 20 @@ -49,8 +51,12 @@ class AsyncLiveClient: # pylint: disable=too-many-instance-attributes _socket: WebSocketClientProtocol _event_handlers: Dict[LiveTranscriptionEvents, list] - _listen_thread: asyncio.Task - _keep_alive_thread: asyncio.Task + + _last_datagram: Optional[datetime] = None + + _listen_thread: Union[asyncio.Task, None] + _keep_alive_thread: Union[asyncio.Task, None] + _flush_thread: Union[asyncio.Task, None] _kwargs: Optional[Dict] = None _addons: Optional[Dict] = None @@ -67,7 +73,16 @@ def __init__(self, config: DeepgramClientOptions): self._config = config self._endpoint = "v1/listen" + + self._listen_thread = None + self._keep_alive_thread = None + self._flush_thread = None + + # exit self._exit_event = asyncio.Event() + + # auto flush + self._flush_event = asyncio.Event() self._event_handlers = { event: [] for event in LiveTranscriptionEvents.__members__.values() } @@ -112,7 +127,7 @@ async def start( if isinstance(options, LiveOptions): self._logger.info("LiveOptions switching class -> dict") - self._options = cast(Dict[str, str], options.to_dict()) + self._options = options.to_dict() elif options is not None: self._options = options else: @@ -146,12 +161,19 @@ async def start( self._listen_thread = asyncio.create_task(self._listening()) # keepalive thread - if self._config.options.get("keepalive") == "true": + if self._config.is_keep_alive_enabled(): self._logger.notice("keepalive is enabled") self._keep_alive_thread = asyncio.create_task(self._keep_alive()) else: self._logger.notice("keepalive is disabled") + # flush thread + if self._config.is_auto_flush_enabled(): + self._logger.notice("autoflush is enabled") + self._flush_thread = asyncio.create_task(self._flush()) + else: + self._logger.notice("autoflush is disabled") + # push open event await self._emit( LiveTranscriptionEvents(LiveTranscriptionEvents.Open), @@ -186,7 +208,7 @@ def on(self, event: LiveTranscriptionEvents, handler) -> None: """ Registers event handlers for specific events. """ - self._logger.info("event fired: %s", event) + self._logger.info("event subscribed: %s", event) if event in LiveTranscriptionEvents.__members__.values() and callable(handler): self._event_handlers[event].append(handler) @@ -195,13 +217,14 @@ async def _emit(self, event: LiveTranscriptionEvents, *args, **kwargs) -> None: """ Emits events to the registered event handlers. """ + self._logger.debug("callback handlers for: %s", event) for handler in self._event_handlers[event]: if asyncio.iscoroutinefunction(handler): await handler(self, *args, **kwargs) else: asyncio.create_task(handler(self, *args, **kwargs)) - # pylint: disable=too-many-return-statements,too-many-statements,too-many-locals + # pylint: disable=too-many-return-statements,too-many-statements,too-many-locals,too-many-branches async def _listening(self) -> None: """ Listens for messages from the WebSocket connection. @@ -244,6 +267,13 @@ async def _listening(self) -> None: message ) self._logger.verbose("LiveResultResponse: %s", msg_result) + + # auto flush + if self._config.is_inspecting_messages(): + inspect_res = await self._inspect(msg_result) + if not inspect_res: + self._logger.error("inspect_res failed") + await self._emit( LiveTranscriptionEvents(LiveTranscriptionEvents.Transcript), result=msg_result, @@ -426,8 +456,7 @@ async def _keep_alive(self) -> None: # deepgram keepalive if counter % DEEPGRAM_INTERVAL == 0: - self._logger.verbose("Sending KeepAlive...") - await self.send(json.dumps({"type": "KeepAlive"})) + await self.keep_alive() except websockets.exceptions.ConnectionClosedOK as e: self._logger.notice(f"_keep_alive({e.code}) exiting gracefully") @@ -514,6 +543,132 @@ async def _keep_alive(self) -> None: raise return + ## pylint: disable=too-many-return-statements,too-many-statements + async def _flush(self) -> None: + self._logger.debug("AsyncLiveClient._flush ENTER") + + delta_in_ms_str = self._config.options.get("auto_flush_reply_delta") + if delta_in_ms_str is None: + self._logger.error("auto_flush_reply_delta is None") + self._logger.debug("AsyncLiveClient._flush LEAVE") + return + delta_in_ms = float(delta_in_ms_str) + + while True: + try: + await asyncio.sleep(HALF_SECOND) + + if self._exit_event.is_set(): + self._logger.notice("_flush exiting gracefully") + self._logger.debug("AsyncLiveClient._flush LEAVE") + return + + if self._socket is None: + self._logger.notice("socket is None, exiting flush") + self._logger.debug("AsyncLiveClient._flush LEAVE") + return + + if self._last_datagram is None: + self._logger.debug("AutoFlush last_datagram is None") + continue + + delta = datetime.now() - self._last_datagram + diff_in_ms = delta.total_seconds() * 1000 + self._logger.debug("AutoFlush delta: %f", diff_in_ms) + if diff_in_ms < delta_in_ms: + self._logger.debug("AutoFlush delta is less than threshold") + continue + + self._last_datagram = None + await self.finalize() + + except websockets.exceptions.ConnectionClosedOK as e: + self._logger.notice(f"_flush({e.code}) exiting gracefully") + self._logger.debug("AsyncLiveClient._flush LEAVE") + return + + except websockets.exceptions.ConnectionClosed as e: + if e.code == 1000: + self._logger.notice(f"_flush({e.code}) exiting gracefully") + self._logger.debug("AsyncLiveClient._flush LEAVE") + return + + self._logger.error( + "ConnectionClosed in AsyncLiveClient._flush with code %s: %s", + e.code, + e.reason, + ) + cc_error: ErrorResponse = ErrorResponse( + "ConnectionClosed in AsyncLiveClient._flush", + f"{e}", + "ConnectionClosed", + ) + await self._emit( + LiveTranscriptionEvents(LiveTranscriptionEvents.Error), + error=cc_error, + **dict(cast(Dict[Any, Any], self._kwargs)), + ) + + # signal exit and close + await self._signal_exit() + + self._logger.debug("AsyncLiveClient._flush LEAVE") + + if self._config.options.get("termination_exception") == "true": + raise + return + + except websockets.exceptions.WebSocketException as e: + self._logger.error( + "WebSocketException in AsyncLiveClient._flush: %s", e + ) + ws_error: ErrorResponse = ErrorResponse( + "WebSocketException in AsyncLiveClient._flush", + f"{e}", + "Exception", + ) + await self._emit( + LiveTranscriptionEvents(LiveTranscriptionEvents.Error), + error=ws_error, + **dict(cast(Dict[Any, Any], self._kwargs)), + ) + + # signal exit and close + await self._signal_exit() + + self._logger.debug("AsyncLiveClient._flush LEAVE") + + if self._config.options.get("termination_exception") == "true": + raise + return + + except Exception as e: # pylint: disable=broad-except + self._logger.error("Exception in AsyncLiveClient._flush: %s", e) + e_error: ErrorResponse = ErrorResponse( + "Exception in AsyncLiveClient._flush", + f"{e}", + "Exception", + ) + self._logger.error("Exception in AsyncLiveClient._flush: %s", str(e)) + await self._emit( + LiveTranscriptionEvents(LiveTranscriptionEvents.Error), + error=e_error, + **dict(cast(Dict[Any, Any], self._kwargs)), + ) + + # signal exit and close + await self._signal_exit() + + self._logger.debug("AsyncLiveClient._flush LEAVE") + + if self._config.options.get("termination_exception") == "true": + raise + return + + # pylint: enable=too-many-return-statements + + # pylint: disable=too-many-return-statements + async def send(self, data: Union[str, bytes]) -> bool: """ Sends data over the WebSocket connection. @@ -570,6 +725,31 @@ async def send(self, data: Union[str, bytes]) -> bool: # pylint: enable=too-many-return-statements + async def keep_alive(self) -> bool: + """ + Sends a KeepAlive message + """ + self._logger.spam("AsyncLiveClient.keep_alive ENTER") + + if self._exit_event.is_set(): + self._logger.notice("keep_alive exiting gracefully") + self._logger.debug("AsyncLiveClient.keep_alive LEAVE") + return False + + if self._socket is not None: + self._logger.notice("Sending KeepAlive...") + ret = await self.send(json.dumps({"type": "KeepAlive"})) + + if not ret: + self._logger.error("keep_alive failed") + self._logger.spam("AsyncLiveClient.keep_alive LEAVE") + return False + + self._logger.notice("keep_alive succeeded") + self._logger.spam("AsyncLiveClient.keep_alive LEAVE") + + return True + async def finalize(self) -> bool: """ Finalizes the Transcript connection by flushing it @@ -582,7 +762,7 @@ async def finalize(self) -> bool: return False if self._socket is not None: - self._logger.notice("sending Finalize...") + self._logger.notice("Sending Finalize...") ret = await self.send(json.dumps({"type": "Finalize"})) if not ret: @@ -609,13 +789,20 @@ async def finish(self) -> bool: try: # Before cancelling, check if the tasks were created tasks = [] - if self._config.options.get("keepalive") == "true": - if self._keep_alive_thread is not None: - self._keep_alive_thread.cancel() - tasks.append(self._keep_alive_thread) + if self._keep_alive_thread is not None: + self._keep_alive_thread.cancel() + tasks.append(self._keep_alive_thread) + self._logger.notice("processing _keep_alive_thread cancel...") + + if self._flush_thread is not None: + self._flush_thread.cancel() + tasks.append(self._flush_thread) + self._logger.notice("processing _flush_thread cancel...") + if self._listen_thread is not None: self._listen_thread.cancel() tasks.append(self._listen_thread) + self._logger.notice("processing _listen_thread cancel...") # Use asyncio.gather to wait for tasks to be cancelled await asyncio.gather(*filter(None, tasks), return_exceptions=True) @@ -673,3 +860,20 @@ async def _signal_exit(self) -> None: self._logger.error("socket.wait_closed failed: %s", e) self._socket = None # type: ignore + + async def _inspect(self, msg_result: LiveResultResponse) -> bool: + sentence = msg_result.channel.alternatives[0].transcript + if len(sentence) == 0: + return True + + if msg_result.is_final: + self._logger.debug("AutoFlush is_final received") + self._last_datagram = None + else: + self._last_datagram = datetime.now() + self._logger.debug( + "AutoFlush interim received: %s", + str(self._last_datagram), + ) + + return True diff --git a/deepgram/clients/live/v1/client.py b/deepgram/clients/live/v1/client.py index 8e554582..37bb5f58 100644 --- a/deepgram/clients/live/v1/client.py +++ b/deepgram/clients/live/v1/client.py @@ -6,6 +6,7 @@ import time import logging from typing import Dict, Union, Optional, cast, Any +from datetime import datetime from websockets.sync.client import connect, ClientConnection import websockets @@ -29,6 +30,7 @@ from .options import LiveOptions ONE_SECOND = 1 +HALF_SECOND = 0.5 DEEPGRAM_INTERVAL = 5 PING_INTERVAL = 20 @@ -51,9 +53,14 @@ class LiveClient: # pylint: disable=too-many-instance-attributes _socket: ClientConnection _exit_event: threading.Event _lock_send: threading.Lock + _lock_flush: threading.Lock _event_handlers: Dict[LiveTranscriptionEvents, list] - _listen_thread: threading.Thread - _keep_alive_thread: threading.Thread + + _last_datagram: Optional[datetime] = None + + _listen_thread: Union[threading.Thread, None] + _keep_alive_thread: Union[threading.Thread, None] + _flush_thread: Union[threading.Thread, None] _kwargs: Optional[Dict] = None _addons: Optional[Dict] = None @@ -70,8 +77,20 @@ def __init__(self, config: DeepgramClientOptions): self._config = config self._endpoint = "v1/listen" - self._exit_event = threading.Event() self._lock_send = threading.Lock() + + self._flush_thread = None + self._keep_alive_thread = None + self._listen_thread = None + + # exit + self._exit_event = threading.Event() + + # auto flush + self._last_datagram = None + self._flush_event = threading.Event() + self._lock_flush = threading.Lock() + self._event_handlers = { event: [] for event in LiveTranscriptionEvents.__members__.values() } @@ -122,7 +141,7 @@ def start( else: self._options = {} - combined_options: Dict = self._options + combined_options = self._options if self._addons is not None: self._logger.info("merging addons to options") combined_options.update(self._addons) @@ -146,13 +165,21 @@ def start( self._listen_thread.start() # keepalive thread - if self._config.options.get("keepalive") == "true": + if self._config.is_keep_alive_enabled(): self._logger.notice("keepalive is enabled") self._keep_alive_thread = threading.Thread(target=self._keep_alive) self._keep_alive_thread.start() else: self._logger.notice("keepalive is disabled") + # flush thread + if self._config.is_auto_flush_enabled(): + self._logger.notice("autoflush is enabled") + self._flush_thread = threading.Thread(target=self._flush) + self._flush_thread.start() + else: + self._logger.notice("autoflush is disabled") + # push open event self._emit( LiveTranscriptionEvents(LiveTranscriptionEvents.Open), @@ -189,7 +216,7 @@ def on( """ Registers event handlers for specific events. """ - self._logger.info("event fired: %s", event) + self._logger.info("event subscribed: %s", event) if event in LiveTranscriptionEvents.__members__.values() and callable(handler): self._event_handlers[event].append(handler) @@ -197,10 +224,11 @@ def _emit(self, event: LiveTranscriptionEvents, *args, **kwargs) -> None: """ Emits events to the registered event handlers. """ + self._logger.debug("callback handlers for: %s", event) for handler in self._event_handlers[event]: handler(self, *args, **kwargs) - # pylint: disable=too-many-return-statements,too-many-statements,too-many-locals + # pylint: disable=too-many-return-statements,too-many-statements,too-many-locals,too-many-branches def _listening( self, ) -> None: @@ -245,6 +273,13 @@ def _listening( message ) self._logger.verbose("LiveResultResponse: %s", msg_result) + + # auto flush + if self._config.is_inspecting_messages(): + inspect_res = self._inspect(msg_result) + if not inspect_res: + self._logger.error("inspect_res failed") + self._emit( LiveTranscriptionEvents(LiveTranscriptionEvents.Transcript), result=msg_result, @@ -405,8 +440,8 @@ def _keep_alive(self) -> None: while True: try: counter += 1 - self._exit_event.wait(timeout=ONE_SECOND) + if self._exit_event.is_set(): self._logger.notice("_keep_alive exiting gracefully") self._logger.debug("LiveClient._keep_alive LEAVE") @@ -419,8 +454,7 @@ def _keep_alive(self) -> None: # deepgram keepalive if counter % DEEPGRAM_INTERVAL == 0: - self._logger.verbose("Sending KeepAlive...") - self.send(json.dumps({"type": "KeepAlive"})) + self.keep_alive() except websockets.exceptions.ConnectionClosedOK as e: self._logger.notice(f"_keep_alive({e.code}) exiting gracefully") @@ -501,6 +535,126 @@ def _keep_alive(self) -> None: # pylint: enable=too-many-return-statements + ## pylint: disable=too-many-return-statements,too-many-statements + def _flush(self) -> None: + self._logger.debug("LiveClient._flush ENTER") + + delta_in_ms_str = self._config.options.get("auto_flush_reply_delta") + if delta_in_ms_str is None: + self._logger.error("auto_flush_reply_delta is None") + self._logger.debug("LiveClient._flush LEAVE") + return + delta_in_ms = float(delta_in_ms_str) + + while True: + try: + self._flush_event.wait(timeout=HALF_SECOND) + + if self._exit_event.is_set(): + self._logger.notice("_flush exiting gracefully") + self._logger.debug("LiveClient._flush LEAVE") + return + + if self._socket is None: + self._logger.debug("socket is None, exiting flush") + self._logger.debug("LiveClient._flush LEAVE") + return + + with self._lock_flush: + if self._last_datagram is None: + self._logger.debug("AutoFlush last_datagram is None") + continue + + delta = datetime.now() - self._last_datagram + diff_in_ms = delta.total_seconds() * 1000 + self._logger.debug("AutoFlush delta: %f", diff_in_ms) + if diff_in_ms < delta_in_ms: + self._logger.debug("AutoFlush delta is less than threshold") + continue + + with self._lock_flush: + self._last_datagram = None + self.finalize() + + except websockets.exceptions.ConnectionClosedOK as e: + self._logger.notice(f"_flush({e.code}) exiting gracefully") + self._logger.debug("LiveClient._flush LEAVE") + return + + except websockets.exceptions.ConnectionClosed as e: + if e.code == 1000: + self._logger.notice(f"_flush({e.code}) exiting gracefully") + self._logger.debug("LiveClient._flush LEAVE") + return + + self._logger.error( + "ConnectionClosed in LiveClient._flush with code %s: %s", + e.code, + e.reason, + ) + cc_error: ErrorResponse = ErrorResponse( + "ConnectionClosed in LiveClient._flush", + f"{e}", + "ConnectionClosed", + ) + self._emit( + LiveTranscriptionEvents(LiveTranscriptionEvents.Error), cc_error + ) + + # signal exit and close + self._signal_exit() + + self._logger.debug("LiveClient._flush LEAVE") + + if self._config.options.get("termination_exception") == "true": + raise + return + + except websockets.exceptions.WebSocketException as e: + self._logger.error( + "WebSocketException in LiveClient._flush with: %s", e + ) + ws_error: ErrorResponse = ErrorResponse( + "WebSocketException in LiveClient._flush", + f"{e}", + "WebSocketException", + ) + self._emit( + LiveTranscriptionEvents(LiveTranscriptionEvents.Error), ws_error + ) + + # signal exit and close + self._signal_exit() + + self._logger.debug("LiveClient._flush LEAVE") + + if self._config.options.get("termination_exception") == "true": + raise + return + + except Exception as e: # pylint: disable=broad-except + self._logger.error("Exception in LiveClient._flush: %s", e) + e_error: ErrorResponse = ErrorResponse( + "Exception in LiveClient._flush", + f"{e}", + "Exception", + ) + self._logger.error("Exception in LiveClient._flush: %s", str(e)) + self._emit( + LiveTranscriptionEvents(LiveTranscriptionEvents.Error), e_error + ) + + # signal exit and close + self._signal_exit() + + self._logger.debug("LiveClient._flush LEAVE") + + if self._config.options.get("termination_exception") == "true": + raise + return + + # pylint: enable=too-many-return-statements + # pylint: disable=too-many-return-statements def send(self, data: Union[str, bytes]) -> bool: """ @@ -561,6 +715,31 @@ def send(self, data: Union[str, bytes]) -> bool: # pylint: enable=too-many-return-statements + def keep_alive(self) -> bool: + """ + Sends a KeepAlive message + """ + self._logger.spam("LiveClient.keep_alive ENTER") + + if self._exit_event.is_set(): + self._logger.notice("keep_alive exiting gracefully") + self._logger.debug("LiveClient.keep_alive LEAVE") + return False + + if self._socket is not None: + self._logger.notice("Sending KeepAlive...") + ret = self.send(json.dumps({"type": "KeepAlive"})) + + if not ret: + self._logger.error("keep_alive failed") + self._logger.spam("LiveClient.keep_alive LEAVE") + return False + + self._logger.notice("keep_alive succeeded") + self._logger.spam("LiveClient.keep_alive LEAVE") + + return True + def finalize(self) -> bool: """ Finalizes the Transcript connection by flushing it @@ -573,7 +752,7 @@ def finalize(self) -> bool: return False if self._socket is not None: - self._logger.notice("sending Finalize...") + self._logger.notice("Sending Finalize...") ret = self.send(json.dumps({"type": "Finalize"})) if not ret: @@ -598,15 +777,19 @@ def finish(self) -> bool: # stop the threads self._logger.verbose("cancelling tasks...") - if self._config.options.get("keepalive") == "true": - if self._keep_alive_thread is not None: - self._keep_alive_thread.join() - self._keep_alive_thread = None # type: ignore - self._logger.notice("processing thread joined") + if self._flush_thread is not None: + self._flush_thread.join() + self._flush_thread = None + self._logger.notice("processing _flush_thread thread joined") + + if self._keep_alive_thread is not None: + self._keep_alive_thread.join() + self._keep_alive_thread = None + self._logger.notice("processing _keep_alive_thread thread joined") if self._listen_thread is not None: self._listen_thread.join() - self._listen_thread = None # type: ignore + self._listen_thread = None self._logger.notice("listening thread joined") self._logger.notice("finish succeeded") @@ -656,3 +839,22 @@ def _signal_exit(self) -> None: self._logger.error("socket.wait_closed failed: %s", e) self._socket = None # type: ignore + + def _inspect(self, msg_result: LiveResultResponse) -> bool: + sentence = msg_result.channel.alternatives[0].transcript + if len(sentence) == 0: + return True + + if msg_result.is_final: + with self._lock_flush: + self._logger.debug("AutoFlush is_final received") + self._last_datagram = None + else: + with self._lock_flush: + self._last_datagram = datetime.now() + self._logger.debug( + "AutoFlush interim received: %s", + str(self._last_datagram), + ) + + return True diff --git a/deepgram/options.py b/deepgram/options.py index 03a8b569..b8d2487c 100644 --- a/deepgram/options.py +++ b/deepgram/options.py @@ -7,6 +7,7 @@ import os from typing import Dict, Optional import logging +import numbers from deepgram import __version__ from deepgram.utils import verboselogs @@ -27,6 +28,16 @@ class DeepgramClientOptions: options: (Optional) Additional options for initializing the client. """ + _logger: verboselogs.VerboseLogger + + verbose: int + url: str + api_key: str + headers: Dict[str, str] + options: Dict[str, str] + + _inspect: bool + def __init__( self, api_key: str = "", @@ -35,14 +46,17 @@ def __init__( headers: Optional[Dict] = None, options: Optional[Dict] = None, ): - self.logger = verboselogs.VerboseLogger(__name__) - self.logger.addHandler(logging.StreamHandler()) + self._logger = verboselogs.VerboseLogger(__name__) + self._logger.addHandler(logging.StreamHandler()) if api_key is None: api_key = "" self.verbose = verbose self.api_key = api_key + + if headers is None: + headers = {} self._update_headers(headers=headers) if len(url) == 0: @@ -53,6 +67,10 @@ def __init__( options = {} self.options = options + self._inspect = False + if self.is_auto_flush_enabled(): + self._inspect = True + def set_apikey(self, api_key: str): """ set_apikey: Sets the API key for the client. @@ -82,6 +100,43 @@ def _update_headers(self, headers: Optional[Dict] = None): if headers: self.headers.update(headers) + def is_keep_alive_enabled(self): + """ + is_keep_alive_enabled: Returns True if the client is configured to keep the connection alive. + """ + bool_auto_flush_delta_org = False + auto_flush_delta_org = self.options.get("keepalive") + if isinstance(auto_flush_delta_org, bool): + bool_auto_flush_delta_org = auto_flush_delta_org + else: + bool_auto_flush_delta_org = auto_flush_delta_org == "true" + + bool_auto_flush_delta_new = False + auto_flush_delta_new = self.options.get("keep_alive") + if isinstance(auto_flush_delta_new, bool): + bool_auto_flush_delta_new = auto_flush_delta_new + else: + bool_auto_flush_delta_new = auto_flush_delta_new == "true" + + return bool_auto_flush_delta_org or bool_auto_flush_delta_new + + def is_auto_flush_enabled(self): + """ + is_auto_flush_enabled: Returns True if the client is configured to auto-flush. + """ + auto_flush_delta = self.options.get("auto_flush_reply_delta") + return ( + auto_flush_delta is not None + and isinstance(auto_flush_delta, numbers.Number) + and auto_flush_delta > 0 + ) + + def is_inspecting_messages(self): + """ + is_inspecting_messages: Returns True if the client is inspecting messages. + """ + return self._inspect + class ClientOptionsFromEnv( DeepgramClientOptions @@ -90,6 +145,8 @@ class ClientOptionsFromEnv( This class extends DeepgramClientOptions and will attempt to use environment variables first before defaults. """ + _logger: verboselogs.VerboseLogger + def __init__( self, api_key: str = "", @@ -98,9 +155,9 @@ def __init__( headers: Optional[Dict] = None, options: Optional[Dict] = None, ): - self.logger = verboselogs.VerboseLogger(__name__) - self.logger.addHandler(logging.StreamHandler()) - self.logger.setLevel(verboselogs.WARNING) # temporary set for setup + self._logger = verboselogs.VerboseLogger(__name__) + self._logger.addHandler(logging.StreamHandler()) + self._logger.setLevel(verboselogs.WARNING) # temporary set for setup if api_key is None: api_key = "" @@ -108,12 +165,12 @@ def __init__( if api_key == "": api_key = os.getenv("DEEPGRAM_API_KEY", "") if api_key == "": - self.logger.critical("Deepgram API KEY is not set") + self._logger.critical("Deepgram API KEY is not set") raise DeepgramApiKeyError("Deepgram API KEY is not set") if url == "": url = os.getenv("DEEPGRAM_HOST", "api.deepgram.com") - self.logger.notice(f"Deepgram host is set to {url}") + self._logger.notice(f"Deepgram host is set to {url}") if verbose == verboselogs.WARNING: _loglevel = os.getenv("DEEPGRAM_LOGGING", "") @@ -122,36 +179,36 @@ def __init__( if isinstance(verbose, str): match verbose: case "NOTSET": - self.logger.notice("Logging level is set to NOTSET") + self._logger.notice("Logging level is set to NOTSET") verbose = verboselogs.NOTSET case "SPAM": - self.logger.notice("Logging level is set to SPAM") + self._logger.notice("Logging level is set to SPAM") verbose = verboselogs.SPAM case "DEBUG": - self.logger.notice("Logging level is set to DEBUG") + self._logger.notice("Logging level is set to DEBUG") verbose = verboselogs.DEBUG case "VERBOSE": - self.logger.notice("Logging level is set to VERBOSE") + self._logger.notice("Logging level is set to VERBOSE") verbose = verboselogs.VERBOSE case "NOTICE": - self.logger.notice("Logging level is set to NOTICE") + self._logger.notice("Logging level is set to NOTICE") verbose = verboselogs.NOTICE case "WARNING": - self.logger.notice("Logging level is set to WARNING") + self._logger.notice("Logging level is set to WARNING") verbose = verboselogs.WARNING case "SUCCESS": - self.logger.notice("Logging level is set to SUCCESS") + self._logger.notice("Logging level is set to SUCCESS") verbose = verboselogs.SUCCESS case "ERROR": - self.logger.notice("Logging level is set to ERROR") + self._logger.notice("Logging level is set to ERROR") verbose = verboselogs.ERROR case "CRITICAL": - self.logger.notice("Logging level is set to CRITICAL") + self._logger.notice("Logging level is set to CRITICAL") verbose = verboselogs.CRITICAL case _: - self.logger.notice("Logging level is set to WARNING") + self._logger.notice("Logging level is set to WARNING") verbose = verboselogs.WARNING - self.logger.notice(f"Logging level is set to {verbose}") + self._logger.notice(f"Logging level is set to {verbose}") if headers is None: headers = {} @@ -159,7 +216,7 @@ def __init__( header = os.getenv(f"DEEPGRAM_HEADER_{x}", None) if header is not None: headers[header] = os.getenv(f"DEEPGRAM_HEADER_VALUE_{x}", None) - self.logger.debug( + self._logger.debug( "Deepgram header %s is set with value %s", header, headers[header], @@ -167,7 +224,7 @@ def __init__( else: break if len(headers) == 0: - self.logger.notice("Deepgram headers are not set") + self._logger.notice("Deepgram headers are not set") headers = None if options is None: @@ -176,13 +233,13 @@ def __init__( param = os.getenv(f"DEEPGRAM_PARAM_{x}", None) if param is not None: options[param] = os.getenv(f"DEEPGRAM_PARAM_VALUE_{x}", None) - self.logger.debug( + self._logger.debug( "Deepgram option %s is set with value %s", param, options[param] ) else: break if len(options) == 0: - self.logger.notice("Deepgram options are not set") + self._logger.notice("Deepgram options are not set") options = None super().__init__( diff --git a/tests/edge_cases/auto_flush/async_microphone_mute/README.md b/tests/edge_cases/auto_flush/async_microphone_mute/README.md new file mode 100644 index 00000000..21af442f --- /dev/null +++ b/tests/edge_cases/auto_flush/async_microphone_mute/README.md @@ -0,0 +1,24 @@ +# Live API (Real-Time) Example + +This example uses the Microphone as input in order to detect conversation insights in what is being said. This example required additional components (for the microphone) to be installed in order for this example to function correctly. + +## Prerequisites + +This example will only work on Linux and MacOS. Windows platforms are not supported. + +## Configuration + +The SDK (and this example) needs to be initialized with your account's credentials `DEEPGRAM_API_KEY`, which are available in your [Deepgram Console][dg-console]. If you don't have a Deepgram account, you can [sign up here][dg-signup] for free. + +You must add your `DEEPGRAM_API_KEY` to your list of environment variables. We use environment variables because they are easy to configure, support PaaS-style deployments, and work well in containerized environments like Docker and Kubernetes. + +```bash +export DEEPGRAM_API_KEY=YOUR-APP-KEY-HERE +``` + +## Installation + +The Live API (Real-Time) example makes use of a [microphone package](https://github.com/deepgram/deepgram-python-sdk/tree/main/deepgram/audio/microphone) contained within the repository. That package makes use of the [PortAudio library](http://www.portaudio.com/) which is a cross-platform open source audio library. If you are on Linux, you can install this library using whatever package manager is available (yum, apt, etc.) on your operating system. If you are on macOS, you can install this library using [brew](https://brew.sh/). + +[dg-console]: https://console.deepgram.com/ +[dg-signup]: https://console.deepgram.com/signup diff --git a/tests/edge_cases/auto_flush/async_microphone_mute/main.py b/tests/edge_cases/auto_flush/async_microphone_mute/main.py new file mode 100644 index 00000000..2793a426 --- /dev/null +++ b/tests/edge_cases/auto_flush/async_microphone_mute/main.py @@ -0,0 +1,174 @@ +# Copyright 2023-2024 Deepgram SDK contributors. All Rights Reserved. +# Use of this source code is governed by a MIT license that can be found in the LICENSE file. +# SPDX-License-Identifier: MIT + +from signal import SIGINT, SIGTERM +import asyncio +from dotenv import load_dotenv +import logging +from deepgram.utils import verboselogs +from time import sleep + +from deepgram import ( + DeepgramClient, + DeepgramClientOptions, + LiveTranscriptionEvents, + LiveOptions, + Microphone, +) + +load_dotenv() + +# We will collect the is_final=true messages here so we can use them when the person finishes speaking +is_finals = [] + + +async def main(): + try: + loop = asyncio.get_event_loop() + + for signal in (SIGTERM, SIGINT): + loop.add_signal_handler( + signal, + lambda: asyncio.create_task( + shutdown(signal, loop, dg_connection, microphone) + ), + ) + + # example of setting up a client config. logging values: WARNING, VERBOSE, DEBUG, SPAM + config = DeepgramClientOptions( + verbose=verboselogs.DEBUG, + options={ + "keepalive": "true", + "auto_flush_reply_delta": 2000, + }, + ) + deepgram: DeepgramClient = DeepgramClient("", config) + # otherwise, use default config + # deepgram: DeepgramClient = DeepgramClient() + + dg_connection = deepgram.listen.asynclive.v("1") + + async def on_open(self, open, **kwargs): + print(f"Connection Open") + + async def on_message(self, result, **kwargs): + global is_finals + sentence = result.channel.alternatives[0].transcript + if len(sentence) == 0: + return + if result.is_final: + # We need to collect these and concatenate them together when we get a speech_final=true + # See docs: https://developers.deepgram.com/docs/understand-endpointing-interim-results + is_finals.append(sentence) + + # Speech Final means we have detected sufficent silence to consider this end of speech + # Speech final is the lowest latency result as it triggers as soon an the endpointing value has triggered + if result.speech_final: + utterance = " ".join(is_finals) + print(f"Speech Final: {utterance}") + is_finals = [] + else: + # These are useful if you need real time captioning and update what the Interim Results produced + print(f"Is Final: {sentence}") + else: + # These are useful if you need real time captioning of what is being spoken + print(f"Interim Results: {sentence}") + + async def on_metadata(self, metadata, **kwargs): + print(f"Metadata: {metadata}") + + async def on_speech_started(self, speech_started, **kwargs): + print(f"Speech Started") + + async def on_utterance_end(self, utterance_end, **kwargs): + print(f"Utterance End") + global is_finals + if len(is_finals) > 0: + utterance = " ".join(is_finals) + print(f"Utterance End: {utterance}") + is_finals = [] + + async def on_close(self, close, **kwargs): + print(f"Connection Closed") + + async def on_error(self, error, **kwargs): + print(f"Handled Error: {error}") + + async def on_unhandled(self, unhandled, **kwargs): + print(f"Unhandled Websocket Message: {unhandled}") + + dg_connection.on(LiveTranscriptionEvents.Open, on_open) + dg_connection.on(LiveTranscriptionEvents.Transcript, on_message) + dg_connection.on(LiveTranscriptionEvents.Metadata, on_metadata) + dg_connection.on(LiveTranscriptionEvents.SpeechStarted, on_speech_started) + dg_connection.on(LiveTranscriptionEvents.UtteranceEnd, on_utterance_end) + dg_connection.on(LiveTranscriptionEvents.Close, on_close) + dg_connection.on(LiveTranscriptionEvents.Error, on_error) + dg_connection.on(LiveTranscriptionEvents.Unhandled, on_unhandled) + + # connect to websocket + options: LiveOptions = LiveOptions( + model="nova-2", + language="en-US", + # Apply smart formatting to the output + smart_format=True, + # Raw audio format deatils + encoding="linear16", + channels=1, + sample_rate=16000, + # To get UtteranceEnd, the following must be set: + interim_results=True, + utterance_end_ms="1000", + vad_events=True, + # Time in milliseconds of silence to wait for before finalizing speech + endpointing=300, + ) + + addons = { + # Prevent waiting for additional numbers + "no_delay": "true" + } + + print("\n\nStart talking! Press Ctrl+C to stop...\n") + if await dg_connection.start(options, addons=addons) is False: + print("Failed to connect to Deepgram") + return + + # Open a microphone stream on the default input device + microphone = Microphone(dg_connection.send) + + # start microphone + microphone.start() + + # wait until cancelled + try: + while True: + await asyncio.sleep(1) + except asyncio.CancelledError: + # This block will be executed when the shutdown coroutine cancels all tasks + pass + finally: + microphone.finish() + await dg_connection.finish() + + print("Finished") + + except Exception as e: + print(f"Could not open socket: {e}") + return + + +async def shutdown(signal, loop, dg_connection, microphone): + print(f"Received exit signal {signal.name}...") + microphone.finish() + await dg_connection.finish() + tasks = [t for t in asyncio.all_tasks() if t is not asyncio.current_task()] + [task.cancel() for task in tasks] + print(f"Cancelling {len(tasks)} outstanding tasks") + await asyncio.gather(*tasks, return_exceptions=True) + loop.stop() + print("Shutdown complete.") + + +asyncio.run(main()) diff --git a/tests/edge_cases/auto_flush/microphone_mute/README.md b/tests/edge_cases/auto_flush/microphone_mute/README.md new file mode 100644 index 00000000..21af442f --- /dev/null +++ b/tests/edge_cases/auto_flush/microphone_mute/README.md @@ -0,0 +1,24 @@ +# Live API (Real-Time) Example + +This example uses the Microphone as input in order to detect conversation insights in what is being said. This example required additional components (for the microphone) to be installed in order for this example to function correctly. + +## Prerequisites + +This example will only work on Linux and MacOS. Windows platforms are not supported. + +## Configuration + +The SDK (and this example) needs to be initialized with your account's credentials `DEEPGRAM_API_KEY`, which are available in your [Deepgram Console][dg-console]. If you don't have a Deepgram account, you can [sign up here][dg-signup] for free. + +You must add your `DEEPGRAM_API_KEY` to your list of environment variables. We use environment variables because they are easy to configure, support PaaS-style deployments, and work well in containerized environments like Docker and Kubernetes. + +```bash +export DEEPGRAM_API_KEY=YOUR-APP-KEY-HERE +``` + +## Installation + +The Live API (Real-Time) example makes use of a [microphone package](https://github.com/deepgram/deepgram-python-sdk/tree/main/deepgram/audio/microphone) contained within the repository. That package makes use of the [PortAudio library](http://www.portaudio.com/) which is a cross-platform open source audio library. If you are on Linux, you can install this library using whatever package manager is available (yum, apt, etc.) on your operating system. If you are on macOS, you can install this library using [brew](https://brew.sh/). + +[dg-console]: https://console.deepgram.com/ +[dg-signup]: https://console.deepgram.com/signup diff --git a/tests/edge_cases/auto_flush/microphone_mute/main.py b/tests/edge_cases/auto_flush/microphone_mute/main.py new file mode 100644 index 00000000..44177be6 --- /dev/null +++ b/tests/edge_cases/auto_flush/microphone_mute/main.py @@ -0,0 +1,148 @@ +# Copyright 2023-2024 Deepgram SDK contributors. All Rights Reserved. +# Use of this source code is governed by a MIT license that can be found in the LICENSE file. +# SPDX-License-Identifier: MIT + +from dotenv import load_dotenv +import logging +from deepgram.utils import verboselogs +from time import sleep + +from deepgram import ( + DeepgramClient, + DeepgramClientOptions, + LiveTranscriptionEvents, + LiveOptions, + Microphone, +) + +load_dotenv() + +# We will collect the is_final=true messages here so we can use them when the person finishes speaking +is_finals = [] + + +def main(): + try: + # example of setting up a client config. logging values: WARNING, VERBOSE, DEBUG, SPAM + config = DeepgramClientOptions( + verbose=verboselogs.DEBUG, + options={ + "keepalive": "true", + "auto_flush_reply_delta": 2000, + }, + ) + deepgram: DeepgramClient = DeepgramClient("", config) + + dg_connection = deepgram.listen.live.v("1") + + def on_open(self, open, **kwargs): + print(f"Connection Open") + + def on_message(self, result, **kwargs): + global is_finals + sentence = result.channel.alternatives[0].transcript + if len(sentence) == 0: + return + if result.is_final: + # We need to collect these and concatenate them together when we get a speech_final=true + # See docs: https://developers.deepgram.com/docs/understand-endpointing-interim-results + is_finals.append(sentence) + + # Speech Final means we have detected sufficent silence to consider this end of speech + # Speech final is the lowest latency result as it triggers as soon an the endpointing value has triggered + if result.speech_final: + utterance = " ".join(is_finals) + print(f"Speech Final: {utterance}") + is_finals = [] + else: + # These are useful if you need real time captioning and update what the Interim Results produced + print(f"Is Final: {sentence}") + else: + # These are useful if you need real time captioning of what is being spoken + print(f"Interim Results: {sentence}") + + def on_metadata(self, metadata, **kwargs): + print(f"Metadata: {metadata}") + + def on_speech_started(self, speech_started, **kwargs): + print(f"Speech Started") + + def on_utterance_end(self, utterance_end, **kwargs): + print(f"Utterance End") + global is_finals + if len(is_finals) > 0: + utterance = " ".join(is_finals) + print(f"Utterance End: {utterance}") + is_finals = [] + + def on_close(self, close, **kwargs): + print(f"Connection Closed") + + def on_error(self, error, **kwargs): + print(f"Handled Error: {error}") + + def on_unhandled(self, unhandled, **kwargs): + print(f"Unhandled Websocket Message: {unhandled}") + + dg_connection.on(LiveTranscriptionEvents.Open, on_open) + dg_connection.on(LiveTranscriptionEvents.Transcript, on_message) + dg_connection.on(LiveTranscriptionEvents.Metadata, on_metadata) + dg_connection.on(LiveTranscriptionEvents.SpeechStarted, on_speech_started) + dg_connection.on(LiveTranscriptionEvents.UtteranceEnd, on_utterance_end) + dg_connection.on(LiveTranscriptionEvents.Close, on_close) + dg_connection.on(LiveTranscriptionEvents.Error, on_error) + dg_connection.on(LiveTranscriptionEvents.Unhandled, on_unhandled) + + options: LiveOptions = LiveOptions( + model="nova-2", + language="en-US", + # Apply smart formatting to the output + smart_format=True, + # Raw audio format details + encoding="linear16", + channels=1, + sample_rate=16000, + # To get UtteranceEnd, the following must be set: + interim_results=True, + utterance_end_ms="1000", + vad_events=True, + # Time in milliseconds of silence to wait for before finalizing speech + endpointing=300, + ) + + addons = { + # Prevent waiting for additional numbers + "no_delay": "true" + } + + print("\n\nPress Enter to stop recording...\n\n") + if dg_connection.start(options, addons=addons) is False: + print("Failed to connect to Deepgram") + return + + # Open a microphone stream on the default input device + microphone = Microphone(dg_connection.send) + + # start microphone + microphone.start() + + # wait until finished + input("") + + # Wait for the microphone to close + microphone.finish() + + # Indicate that we've finished + dg_connection.finish() + + print("Finished") + # sleep(30) # wait 30 seconds to see if there is any additional socket activity + # print("Really done!") + + except Exception as e: + print(f"Could not open socket: {e}") + return + + +if __name__ == "__main__": + main()