From 2ab9a61c70b0bb3687a290081e03f49e3ddbba2b Mon Sep 17 00:00:00 2001 From: David vonThenen <12752197+dvonthenen@users.noreply.github.com> Date: Mon, 15 Jul 2024 10:58:47 -0700 Subject: [PATCH] [HOLD] Reintroduce TTS WS --- deepgram/__init__.py | 34 +- deepgram/client.py | 34 +- deepgram/clients/__init__.py | 34 +- deepgram/clients/abstract_async_client.py | 1 - deepgram/clients/abstract_sync_client.py | 6 +- .../listen/v1/websocket/async_client.py | 38 +- .../clients/listen/v1/websocket/client.py | 38 +- deepgram/clients/speak/__init__.py | 24 +- deepgram/clients/speak/client.py | 44 +- deepgram/clients/speak/enums.py | 13 +- deepgram/clients/speak/v1/__init__.py | 31 +- .../clients/speak/v1/websocket/__init__.py | 17 + .../speak/v1/websocket/async_client.py | 834 ++++++++++++++++++ deepgram/clients/speak/v1/websocket/client.py | 801 +++++++++++++++++ .../clients/speak/v1/websocket/helpers.py | 43 + .../clients/speak/v1/websocket/options.py | 7 + .../clients/speak/v1/websocket/response.py | 185 ++++ deepgram/options.py | 44 +- examples/requirements-examples.txt | 5 +- examples/speech-to-text/rest/url/main.py | 4 +- .../websocket/async_complete/main.py | 102 +++ .../text-to-speech/websocket/complete/main.py | 104 +++ .../websocket/simple/main-containerized.py | 84 ++ .../simple/main-non-containerized.py | 84 ++ 24 files changed, 2468 insertions(+), 143 deletions(-) create mode 100644 deepgram/clients/speak/v1/websocket/__init__.py create mode 100644 deepgram/clients/speak/v1/websocket/async_client.py create mode 100644 deepgram/clients/speak/v1/websocket/client.py create mode 100644 deepgram/clients/speak/v1/websocket/helpers.py create mode 100644 deepgram/clients/speak/v1/websocket/options.py create mode 100644 deepgram/clients/speak/v1/websocket/response.py create mode 100644 examples/text-to-speech/websocket/async_complete/main.py create mode 100644 examples/text-to-speech/websocket/complete/main.py create mode 100644 examples/text-to-speech/websocket/simple/main-containerized.py create mode 100644 examples/text-to-speech/websocket/simple/main-non-containerized.py diff --git a/deepgram/__init__.py b/deepgram/__init__.py index f0984073..4ffb520d 100644 --- a/deepgram/__init__.py +++ b/deepgram/__init__.py @@ -96,12 +96,12 @@ from .client import ( SpeakOptions, SpeakRESTOptions, - # SpeakWebSocketOptions, + SpeakWebSocketOptions, # FileSource, SpeakRestSource, SpeakSource, ) -from .client import SpeakWebSocketEvents +from .client import SpeakWebSocketEvents, SpeakWebSocketMessage ## speak REST from .client import ( @@ -115,21 +115,21 @@ SpeakRESTResponse, ) -# ## speak WebSocket -# from .client import ( -# SpeakWebSocketClient, -# AsyncSpeakWebSocketClient, -# ) -# from .client import ( -# SpeakWebSocketResponse, -# # OpenResponse, -# # MetadataResponse, -# FlushedResponse, -# # CloseResponse, -# # UnhandledResponse, -# WarningResponse, -# # ErrorResponse, -# ) +## speak WebSocket +from .client import ( + SpeakWebSocketClient, + AsyncSpeakWebSocketClient, +) +from .client import ( + SpeakWebSocketResponse, + # OpenResponse, + # MetadataResponse, + FlushedResponse, + # CloseResponse, + # UnhandledResponse, + WarningResponse, + # ErrorResponse, +) # manage from .client import ManageClient, AsyncManageClient diff --git a/deepgram/client.py b/deepgram/client.py index 05ce0bd7..5dfb4d6f 100644 --- a/deepgram/client.py +++ b/deepgram/client.py @@ -100,12 +100,12 @@ from .clients import ( SpeakOptions, SpeakRESTOptions, - # SpeakWebSocketOptions, + SpeakWebSocketOptions, # FileSource, SpeakRestSource, SpeakSource, ) -from .clients import SpeakWebSocketEvents +from .clients import SpeakWebSocketEvents, SpeakWebSocketMessage ## speak REST from .clients import ( @@ -119,21 +119,21 @@ SpeakRESTResponse, ) -# ## speak WebSocket -# from .clients import ( -# SpeakWebSocketClient, -# AsyncSpeakWebSocketClient, -# ) -# from .clients import ( -# SpeakWebSocketResponse, -# # OpenResponse, -# # MetadataResponse, -# FlushedResponse, -# # CloseResponse, -# # UnhandledResponse, -# WarningResponse, -# # ErrorResponse, -# ) +## speak WebSocket +from .clients import ( + SpeakWebSocketClient, + AsyncSpeakWebSocketClient, +) +from .clients import ( + SpeakWebSocketResponse, + # OpenResponse, + # MetadataResponse, + FlushedResponse, + # CloseResponse, + # UnhandledResponse, + WarningResponse, + # ErrorResponse, +) # manage client classes/input from .clients import ManageClient, AsyncManageClient diff --git a/deepgram/clients/__init__.py b/deepgram/clients/__init__.py index 8aa1661e..d34087bc 100644 --- a/deepgram/clients/__init__.py +++ b/deepgram/clients/__init__.py @@ -105,13 +105,13 @@ from .speak import ( SpeakOptions, SpeakRESTOptions, - # SpeakWebSocketOptions, + SpeakWebSocketOptions, # FileSource, SpeakRestSource, SpeakSource, ) -from .speak import SpeakWebSocketEvents +from .speak import SpeakWebSocketEvents, SpeakWebSocketMessage ## text-to-speech REST from .speak import ( @@ -125,21 +125,21 @@ SpeakRESTResponse, ) -# ## text-to-speech WebSocket -# from .speak import ( -# SpeakWebSocketClient, -# AsyncSpeakWebSocketClient, -# ) -# from .speak import ( -# SpeakWebSocketResponse, -# # OpenResponse, -# # MetadataResponse, -# FlushedResponse, -# # CloseResponse, -# # UnhandledResponse, -# WarningResponse, -# # ErrorResponse, -# ) +## text-to-speech WebSocket +from .speak import ( + SpeakWebSocketClient, + AsyncSpeakWebSocketClient, +) +from .speak import ( + SpeakWebSocketResponse, + # OpenResponse, + # MetadataResponse, + FlushedResponse, + # CloseResponse, + # UnhandledResponse, + WarningResponse, + # ErrorResponse, +) # manage from .manage import ManageClient, AsyncManageClient diff --git a/deepgram/clients/abstract_async_client.py b/deepgram/clients/abstract_async_client.py index 03b86a74..1ff53ba9 100644 --- a/deepgram/clients/abstract_async_client.py +++ b/deepgram/clients/abstract_async_client.py @@ -348,7 +348,6 @@ async def _handle_request_raw( client = httpx.AsyncClient(timeout=timeout, transport=transport) if transport: kwargs.pop("transport") - kwargs.pop("transport") req = client.build_request(method, _url, headers=_headers, **kwargs) return await client.send(req, stream=True) diff --git a/deepgram/clients/abstract_sync_client.py b/deepgram/clients/abstract_sync_client.py index 6e9d2976..bff6e27c 100644 --- a/deepgram/clients/abstract_sync_client.py +++ b/deepgram/clients/abstract_sync_client.py @@ -337,9 +337,9 @@ def _handle_request_raw( try: transport = kwargs.get("transport") - with httpx.Client(timeout=timeout, transport=transport) as client: - if transport: - kwargs.pop("transport") + client = httpx.Client(timeout=timeout, transport=transport) + if transport: + kwargs.pop("transport") req = client.build_request(method, _url, headers=_headers, **kwargs) return client.send(req, stream=True) diff --git a/deepgram/clients/listen/v1/websocket/async_client.py b/deepgram/clients/listen/v1/websocket/async_client.py index 87603e7a..550bbf2f 100644 --- a/deepgram/clients/listen/v1/websocket/async_client.py +++ b/deepgram/clients/listen/v1/websocket/async_client.py @@ -53,11 +53,10 @@ class AsyncListenWebSocketClient: # pylint: disable=too-many-instance-attribute _socket: WebSocketClientProtocol _event_handlers: Dict[LiveTranscriptionEvents, list] - _last_datagram: Optional[datetime] = None - _listen_thread: Union[asyncio.Task, None] _keep_alive_thread: Union[asyncio.Task, None] _flush_thread: Union[asyncio.Task, None] + _last_datagram: Optional[datetime] = None _kwargs: Optional[Dict] = None _addons: Optional[Dict] = None @@ -79,11 +78,10 @@ def __init__(self, config: DeepgramClientOptions): self._keep_alive_thread = None self._flush_thread = None - # exit + # events self._exit_event = asyncio.Event() - # auto flush - self._flush_event = asyncio.Event() + # init handlers self._event_handlers = { event: [] for event in LiveTranscriptionEvents.__members__.values() } @@ -174,7 +172,7 @@ async def start( self._logger.notice("keepalive is disabled") # flush thread - if self._config.is_auto_flush_enabled(): + if self._config.is_auto_flush_reply_enabled(): self._logger.notice("autoflush is enabled") self._flush_thread = asyncio.create_task(self._flush()) else: @@ -219,7 +217,7 @@ async def start( raise return False - def is_connected(self) -> bool: + async def is_connected(self) -> bool: """ Returns the connection status of the WebSocket. """ @@ -311,7 +309,7 @@ async def _listening(self) -> None: self._logger.verbose("LiveResultResponse: %s", msg_result) # auto flush - if self._config.is_inspecting_messages(): + if self._config.is_inspecting_listen(): inspect_res = await self._inspect(msg_result) if not inspect_res: self._logger.error("inspect_res failed") @@ -400,6 +398,8 @@ async def _listening(self) -> None: self._logger.debug("AsyncListenWebSocketClient._listening LEAVE") return + # we need to explicitly call self._signal_exit() here because we are hanging on a recv() + # note: this is different than the speak websocket client self._logger.error( "ConnectionClosed in AsyncListenWebSocketClient._listening with code %s: %s", e.code, @@ -508,11 +508,13 @@ async def _keep_alive(self) -> None: return except websockets.exceptions.ConnectionClosed as e: - if e.code == 1000: + if e.code in [1000, 1001]: self._logger.notice(f"_keep_alive({e.code}) exiting gracefully") self._logger.debug("AsyncListenWebSocketClient._keep_alive LEAVE") return + # we need to explicitly call self._signal_exit() here because we are hanging on a recv() + # note: this is different than the speak websocket client self._logger.error( "ConnectionClosed in AsyncListenWebSocketClient._keep_alive with code %s: %s", e.code, @@ -635,11 +637,13 @@ async def _flush(self) -> None: return except websockets.exceptions.ConnectionClosed as e: - if e.code == 1000: + if e.code in [1000, 1001]: self._logger.notice(f"_flush({e.code}) exiting gracefully") self._logger.debug("AsyncListenWebSocketClient._flush LEAVE") return + # we need to explicitly call self._signal_exit() here because we are hanging on a recv() + # note: this is different than the speak websocket client self._logger.error( "ConnectionClosed in AsyncListenWebSocketClient._flush with code %s: %s", e.code, @@ -731,6 +735,11 @@ async def send(self, data: Union[str, bytes]) -> bool: self._logger.debug("AsyncListenWebSocketClient.send LEAVE") return False + if not await self.is_connected(): + self._logger.notice("is_connected is False") + self._logger.debug("AsyncListenWebSocketClient.send LEAVE") + return False + if self._socket is not None: try: await self._socket.send(data) @@ -741,7 +750,7 @@ async def send(self, data: Union[str, bytes]) -> bool: raise return True except websockets.exceptions.ConnectionClosed as e: - if e.code == 1000: + if e.code in [1000, 1001]: self._logger.notice(f"send({e.code}) exiting gracefully") self._logger.debug("AsyncListenWebSocketClient.send LEAVE") if self._config.options.get("termination_exception_send") == "true": @@ -897,7 +906,7 @@ async def _signal_exit(self) -> None: except websockets.exceptions.ConnectionClosedOK as e: self._logger.notice("_signal_exit - ConnectionClosedOK: %s", e.code) except websockets.exceptions.ConnectionClosed as e: - self._logger.notice("_signal_exit - ConnectionClosed: %s", e.code) + self._logger.error("_signal_exit - ConnectionClosed: %s", e.code) except websockets.exceptions.WebSocketException as e: self._logger.error("_signal_exit - WebSocketException: %s", str(e)) except Exception as e: # pylint: disable=broad-except @@ -931,6 +940,11 @@ async def _signal_exit(self) -> None: self._socket = None # type: ignore async def _inspect(self, msg_result: LiveResultResponse) -> bool: + # auto flush_inspect is generically used to track any messages you might want to snoop on + # place additional logic here to inspect messages of interest + + # for auto flush functionality + # set the last datagram sentence = msg_result.channel.alternatives[0].transcript if len(sentence) == 0: return True diff --git a/deepgram/clients/listen/v1/websocket/client.py b/deepgram/clients/listen/v1/websocket/client.py index 8f074800..0684149c 100644 --- a/deepgram/clients/listen/v1/websocket/client.py +++ b/deepgram/clients/listen/v1/websocket/client.py @@ -56,11 +56,10 @@ class ListenWebSocketClient: # pylint: disable=too-many-instance-attributes _lock_flush: threading.Lock _event_handlers: Dict[LiveTranscriptionEvents, list] - _last_datagram: Optional[datetime] = None - _listen_thread: Union[threading.Thread, None] _keep_alive_thread: Union[threading.Thread, None] _flush_thread: Union[threading.Thread, None] + _last_datagram: Optional[datetime] = None _kwargs: Optional[Dict] = None _addons: Optional[Dict] = None @@ -88,9 +87,9 @@ def __init__(self, config: DeepgramClientOptions): # auto flush self._last_datagram = None - self._flush_event = threading.Event() self._lock_flush = threading.Lock() + # init handlers self._event_handlers = { event: [] for event in LiveTranscriptionEvents.__members__.values() } @@ -178,7 +177,7 @@ def start( self._logger.notice("keepalive is disabled") # flush thread - if self._config.is_auto_flush_enabled(): + if self._config.is_auto_flush_reply_enabled(): self._logger.notice("autoflush is enabled") self._flush_thread = threading.Thread(target=self._flush) self._flush_thread.start() @@ -295,7 +294,7 @@ def _listening( self._logger.verbose("LiveResultResponse: %s", msg_result) # auto flush - if self._config.is_inspecting_messages(): + if self._config.is_inspecting_listen(): inspect_res = self._inspect(msg_result) if not inspect_res: self._logger.error("inspect_res failed") @@ -379,11 +378,13 @@ def _listening( return except websockets.exceptions.ConnectionClosed as e: - if e.code == 1000: + if e.code in [1000, 1001]: self._logger.notice(f"_listening({e.code}) exiting gracefully") self._logger.debug("ListenWebSocketClient._listening LEAVE") return + # we need to explicitly call self._signal_exit() here because we are hanging on a recv() + # note: this is different than the speak websocket client self._logger.error( "ConnectionClosed in ListenWebSocketClient._listening with code %s: %s", e.code, @@ -486,11 +487,13 @@ def _keep_alive(self) -> None: return except websockets.exceptions.ConnectionClosed as e: - if e.code == 1000: + if e.code in [1000, 1001]: self._logger.notice(f"_keep_alive({e.code}) exiting gracefully") self._logger.debug("ListenWebSocketClient._keep_alive LEAVE") return + # we need to explicitly call self._signal_exit() here because we are hanging on a recv() + # note: this is different than the speak websocket client self._logger.error( "ConnectionClosed in ListenWebSocketClient._keep_alive with code %s: %s", e.code, @@ -575,9 +578,10 @@ def _flush(self) -> None: return delta_in_ms = float(delta_in_ms_str) + _flush_event = threading.Event() while True: try: - self._flush_event.wait(timeout=HALF_SECOND) + _flush_event.wait(timeout=HALF_SECOND) if self._exit_event.is_set(): self._logger.notice("_flush exiting gracefully") @@ -611,11 +615,13 @@ def _flush(self) -> None: return except websockets.exceptions.ConnectionClosed as e: - if e.code == 1000: + if e.code in [1000, 1001]: self._logger.notice(f"_flush({e.code}) exiting gracefully") self._logger.debug("ListenWebSocketClient._flush LEAVE") return + # we need to explicitly call self._signal_exit() here because we are hanging on a recv() + # note: this is different than the speak websocket client self._logger.error( "ConnectionClosed in ListenWebSocketClient._flush with code %s: %s", e.code, @@ -698,18 +704,23 @@ def send(self, data: Union[str, bytes]) -> bool: self._logger.debug("ListenWebSocketClient.send LEAVE") return False + if not self.is_connected(): + self._logger.notice("is_connected is False") + self._logger.debug("ListenWebSocketClient.send LEAVE") + return False + if self._socket is not None: with self._lock_send: try: self._socket.send(data) except websockets.exceptions.ConnectionClosedOK as e: self._logger.notice(f"send() exiting gracefully: {e.code}") - self._logger.debug("ListenWebSocketClient._keep_alive LEAVE") + self._logger.debug("ListenWebSocketClient.send LEAVE") if self._config.options.get("termination_exception_send") == "true": raise return True except websockets.exceptions.ConnectionClosed as e: - if e.code == 1000: + if e.code in [1000, 1001]: self._logger.notice(f"send({e.code}) exiting gracefully") self._logger.debug("ListenWebSocketClient.send LEAVE") if ( @@ -890,6 +901,11 @@ def _signal_exit(self) -> None: self._socket = None # type: ignore def _inspect(self, msg_result: LiveResultResponse) -> bool: + # auto flush_inspect is generically used to track any messages you might want to snoop on + # place additional logic here to inspect messages of interest + + # for auto flush functionality + # set the last datagram sentence = msg_result.channel.alternatives[0].transcript if len(sentence) == 0: return True diff --git a/deepgram/clients/speak/__init__.py b/deepgram/clients/speak/__init__.py index 07d8f42e..64855cee 100644 --- a/deepgram/clients/speak/__init__.py +++ b/deepgram/clients/speak/__init__.py @@ -2,20 +2,20 @@ # Use of this source code is governed by a MIT license that can be found in the LICENSE file. # SPDX-License-Identifier: MIT -from .enums import SpeakWebSocketEvents +from .enums import SpeakWebSocketEvents, SpeakWebSocketMessage from ...options import DeepgramClientOptions, ClientOptionsFromEnv from .client import ( SpeakClient, # backward compat SpeakRESTClient, AsyncSpeakRESTClient, - # SpeakWebSocketClient, - # AsyncSpeakWebSocketClient, + SpeakWebSocketClient, + AsyncSpeakWebSocketClient, ) from .client import ( SpeakOptions, SpeakRESTOptions, - # SpeakWebSocketOptions, + SpeakWebSocketOptions, FileSource, SpeakRestSource, SpeakSource, @@ -23,12 +23,12 @@ from .client import ( SpeakResponse, # backward compat SpeakRESTResponse, - # SpeakWebSocketResponse, - # OpenResponse, - # MetadataResponse, - # FlushedResponse, - # CloseResponse, - # UnhandledResponse, - # WarningResponse, - # ErrorResponse, + SpeakWebSocketResponse, + OpenResponse, + MetadataResponse, + FlushedResponse, + CloseResponse, + UnhandledResponse, + WarningResponse, + ErrorResponse, ) diff --git a/deepgram/clients/speak/client.py b/deepgram/clients/speak/client.py index fc5e902e..e429fba0 100644 --- a/deepgram/clients/speak/client.py +++ b/deepgram/clients/speak/client.py @@ -5,27 +5,27 @@ from .v1 import ( SpeakRESTClient as SpeakRESTClientLatest, AsyncSpeakRESTClient as AsyncSpeakRESTClientLatest, - # SpeakWebSocketClient as SpeakWebSocketClientLatest, - # AsyncSpeakWebSocketClient as AsyncSpeakWebSocketClientLatest, + SpeakWebSocketClient as SpeakWebSocketClientLatest, + AsyncSpeakWebSocketClient as AsyncSpeakWebSocketClientLatest, ) from .v1 import ( SpeakOptions as SpeakOptionsLatest, SpeakRESTOptions as SpeakRESTOptionsLatest, - # SpeakWebSocketOptions as SpeakWebSocketOptionsLatest, + SpeakWebSocketOptions as SpeakWebSocketOptionsLatest, FileSource as FileSourceLatest, SpeakRestSource as SpeakRestSourceLatest, SpeakSource as SpeakSourceLatest, ) from .v1 import ( SpeakRESTResponse as SpeakRESTResponseLatest, - # SpeakWebSocketResponse as SpeakWebSocketResponseLatest, - # OpenResponse as OpenResponseLatest, - # MetadataResponse as MetadataResponseLatest, - # FlushedResponse as FlushedResponseLatest, - # CloseResponse as CloseResponseLatest, - # UnhandledResponse as UnhandledResponseLatest, - # WarningResponse as WarningResponseLatest, - # ErrorResponse as ErrorResponseLatest, + SpeakWebSocketResponse as SpeakWebSocketResponseLatest, + OpenResponse as OpenResponseLatest, + MetadataResponse as MetadataResponseLatest, + FlushedResponse as FlushedResponseLatest, + CloseResponse as CloseResponseLatest, + UnhandledResponse as UnhandledResponseLatest, + WarningResponse as WarningResponseLatest, + ErrorResponse as ErrorResponseLatest, ) # The client.py points to the current supported version in the SDK. @@ -35,21 +35,21 @@ # input SpeakOptions = SpeakOptionsLatest SpeakRESTOptions = SpeakRESTOptionsLatest -# SpeakWebSocketOptions = SpeakWebSocketOptionsLatest +SpeakWebSocketOptions = SpeakWebSocketOptionsLatest SpeakRestSource = SpeakRestSourceLatest FileSource = FileSourceLatest SpeakSource = SpeakSourceLatest # output SpeakRESTResponse = SpeakRESTResponseLatest -# SpeakWebSocketResponse = SpeakWebSocketResponseLatest -# OpenResponse = OpenResponseLatest -# MetadataResponse = MetadataResponseLatest -# FlushedResponse = FlushedResponseLatest -# CloseResponse = CloseResponseLatest -# UnhandledResponse = UnhandledResponseLatest -# WarningResponse = WarningResponseLatest -# ErrorResponse = ErrorResponseLatest +SpeakWebSocketResponse = SpeakWebSocketResponseLatest +OpenResponse = OpenResponseLatest +MetadataResponse = MetadataResponseLatest +FlushedResponse = FlushedResponseLatest +CloseResponse = CloseResponseLatest +UnhandledResponse = UnhandledResponseLatest +WarningResponse = WarningResponseLatest +ErrorResponse = ErrorResponseLatest # backward compatibility @@ -59,5 +59,5 @@ # clients SpeakRESTClient = SpeakRESTClientLatest AsyncSpeakRESTClient = AsyncSpeakRESTClientLatest -# SpeakWebSocketClient = SpeakWebSocketClientLatest -# AsyncSpeakWebSocketClient = AsyncSpeakWebSocketClientLatest +SpeakWebSocketClient = SpeakWebSocketClientLatest +AsyncSpeakWebSocketClient = AsyncSpeakWebSocketClientLatest diff --git a/deepgram/clients/speak/enums.py b/deepgram/clients/speak/enums.py index 17007aaf..abffe980 100644 --- a/deepgram/clients/speak/enums.py +++ b/deepgram/clients/speak/enums.py @@ -7,6 +7,17 @@ # Constants mapping to events from the Deepgram API +class SpeakWebSocketMessage(StrEnum): + """ + Enumerates the possible message types that can be received from the Deepgram API + """ + + Speak: str = "Speak" + Flush: str = "Flush" + Reset: str = "Reset" + Close: str = "Close" + + class SpeakWebSocketEvents(StrEnum): """ Enumerates the possible events that can be received from the Deepgram API @@ -16,7 +27,7 @@ class SpeakWebSocketEvents(StrEnum): Close: str = "Close" AudioData: str = "AudioData" Metadata: str = "Metadata" - Flush: str = "Flush" + Flush: str = "Flushed" Unhandled: str = "Unhandled" Error: str = "Error" Warning: str = "Warning" diff --git a/deepgram/clients/speak/v1/__init__.py b/deepgram/clients/speak/v1/__init__.py index 97e63d7a..b8fa18eb 100644 --- a/deepgram/clients/speak/v1/__init__.py +++ b/deepgram/clients/speak/v1/__init__.py @@ -11,10 +11,9 @@ SpeakSource, FileSource, ) - -# from .websocket import ( -# SpeakWebSocketOptions, -# ) +from .websocket import ( + SpeakWebSocketOptions, +) from ....options import DeepgramClientOptions, ClientOptionsFromEnv # rest @@ -22,15 +21,15 @@ from .rest import SpeakRESTResponse -# # websocket -# from .websocket import SpeakWebSocketClient, AsyncSpeakWebSocketClient -# from .websocket import ( -# SpeakWebSocketResponse, -# OpenResponse, -# MetadataResponse, -# FlushedResponse, -# CloseResponse, -# UnhandledResponse, -# WarningResponse, -# ErrorResponse, -# ) +# websocket +from .websocket import SpeakWebSocketClient, AsyncSpeakWebSocketClient +from .websocket import ( + SpeakWebSocketResponse, + OpenResponse, + MetadataResponse, + FlushedResponse, + CloseResponse, + UnhandledResponse, + WarningResponse, + ErrorResponse, +) diff --git a/deepgram/clients/speak/v1/websocket/__init__.py b/deepgram/clients/speak/v1/websocket/__init__.py new file mode 100644 index 00000000..e6698987 --- /dev/null +++ b/deepgram/clients/speak/v1/websocket/__init__.py @@ -0,0 +1,17 @@ +# Copyright 2024 Deepgram SDK contributors. All Rights Reserved. +# Use of this source code is governed by a MIT license that can be found in the LICENSE file. +# SPDX-License-Identifier: MIT + +from .client import SpeakWebSocketClient +from .async_client import AsyncSpeakWebSocketClient +from .response import ( + SpeakWebSocketResponse, + OpenResponse, + MetadataResponse, + FlushedResponse, + CloseResponse, + UnhandledResponse, + WarningResponse, + ErrorResponse, +) +from .options import SpeakWebSocketOptions diff --git a/deepgram/clients/speak/v1/websocket/async_client.py b/deepgram/clients/speak/v1/websocket/async_client.py new file mode 100644 index 00000000..2468ba05 --- /dev/null +++ b/deepgram/clients/speak/v1/websocket/async_client.py @@ -0,0 +1,834 @@ +# Copyright 2024 Deepgram SDK contributors. All Rights Reserved. +# Use of this source code is governed by a MIT license that can be found in the LICENSE file. +# SPDX-License-Identifier: MIT +import asyncio +import json +import logging +from typing import Dict, Union, Optional, cast, Any +from datetime import datetime +import threading + +import websockets +from websockets.client import WebSocketClientProtocol + +from .....utils import verboselogs +from .....options import DeepgramClientOptions +from ...enums import SpeakWebSocketEvents, SpeakWebSocketMessage +from .helpers import convert_to_websocket_url, append_query_params +from ...errors import DeepgramError + +from .response import ( + OpenResponse, + MetadataResponse, + FlushedResponse, + CloseResponse, + WarningResponse, + ErrorResponse, + UnhandledResponse, +) +from .options import SpeakWebSocketOptions + +ONE_SECOND = 1 +HALF_SECOND = 0.5 +DEEPGRAM_INTERVAL = 5 +PING_INTERVAL = 20 + + +class AsyncSpeakWebSocketClient: # pylint: disable=too-many-instance-attributes + """ + Client for interacting with Deepgram's text-to-speech services over WebSockets. + + This class provides methods to establish a WebSocket connection for TTS synthesis and handle real-time TTS synthesis events. + + Args: + config (DeepgramClientOptions): all the options for the client. + """ + + _logger: verboselogs.VerboseLogger + _config: DeepgramClientOptions + _endpoint: str + _websocket_url: str + + _socket: WebSocketClientProtocol + _event_handlers: Dict[SpeakWebSocketEvents, list] + + _listen_thread: Union[asyncio.Task, None] + _flush_thread: Union[asyncio.Task, None] + _last_datagram: Optional[datetime] = None + _flush_count: int + + _kwargs: Optional[Dict] = None + _addons: Optional[Dict] = None + _options: Optional[Dict] = None + _headers: Optional[Dict] = None + + def __init__(self, config: DeepgramClientOptions): + if config is None: + raise DeepgramError("Config are required") + + self._logger = verboselogs.VerboseLogger(__name__) + self._logger.addHandler(logging.StreamHandler()) + self._logger.setLevel(config.verbose) + + self._config = config + self._endpoint = "v1/speak" + + self._listen_thread = None + self._flush_thread = None + + # events + self._exit_event = asyncio.Event() + + # flush + self._last_datagram = None + self._flush_count = 0 + + # init handlers + self._event_handlers = { + event: [] for event in SpeakWebSocketEvents.__members__.values() + } + self._websocket_url = convert_to_websocket_url(self._config.url, self._endpoint) + + # pylint: disable=too-many-branches,too-many-statements + async def start( + self, + options: Optional[Union[SpeakWebSocketOptions, Dict]] = None, + addons: Optional[Dict] = None, + headers: Optional[Dict] = None, + members: Optional[Dict] = None, + **kwargs, + ) -> bool: + """ + Starts the WebSocket connection for text-to-speech synthesis. + """ + self._logger.debug("AsyncSpeakStreamClient.start ENTER") + self._logger.info("options: %s", options) + self._logger.info("addons: %s", addons) + self._logger.info("headers: %s", headers) + self._logger.info("members: %s", members) + self._logger.info("kwargs: %s", kwargs) + + if isinstance(options, SpeakWebSocketOptions) and not options.check(): + self._logger.error("options.check failed") + self._logger.debug("SpeakStreamClient.start LEAVE") + raise DeepgramError("Fatal text-to-speech options error") + + self._addons = addons + self._headers = headers + + # add "members" as members of the class + if members is not None: + self.__dict__.update(members) + + # set kwargs as members of the class + if kwargs is not None: + self._kwargs = kwargs + else: + self._kwargs = {} + + if isinstance(options, SpeakWebSocketOptions): + self._logger.info("SpeakWebSocketOptions switching class -> dict") + self._options = options.to_dict() + elif options is not None: + self._options = options + else: + self._options = {} + + combined_options = self._options + if self._addons is not None: + self._logger.info("merging addons to options") + combined_options.update(self._addons) + self._logger.info("new options: %s", combined_options) + self._logger.debug("combined_options: %s", combined_options) + + combined_headers = self._config.headers + if self._headers is not None: + self._logger.info("merging headers to options") + combined_headers.update(self._headers) + self._logger.info("new headers: %s", combined_headers) + self._logger.debug("combined_headers: %s", combined_headers) + + url_with_params = append_query_params(self._websocket_url, combined_options) + + try: + self._socket = await websockets.connect( + url_with_params, extra_headers=combined_headers + ) + self._exit_event.clear() + + # debug the threads + for thread in threading.enumerate(): + self._logger.debug("after running thread: %s", thread.name) + self._logger.debug("number of active threads: %s", threading.active_count()) + + # listen thread + self._listen_thread = asyncio.create_task(self._listening()) + + # flush thread + if self._config.is_auto_flush_speak_enabled(): + self._logger.notice("autoflush is enabled") + self._flush_thread = asyncio.create_task(self._flush()) + else: + self._logger.notice("autoflush is disabled") + + # debug the threads + for thread in threading.enumerate(): + self._logger.debug("after running thread: %s", thread.name) + self._logger.debug("number of active threads: %s", threading.active_count()) + + # push open event + await self._emit( + SpeakWebSocketEvents(SpeakWebSocketEvents.Open), + OpenResponse(type=SpeakWebSocketEvents.Open), + ) + + self._logger.notice("start succeeded") + self._logger.debug("AsyncSpeakStreamClient.start LEAVE") + return True + except websockets.ConnectionClosed as e: + self._logger.notice( + "ConnectionClosed in AsyncSpeakStreamClient.start: %s", e + ) + self._logger.debug("AsyncSpeakStreamClient.start LEAVE") + if self._config.options.get("termination_exception_connect") == "true": + raise + return False + except websockets.exceptions.WebSocketException as e: + self._logger.error( + "WebSocketException in AsyncSpeakStreamClient.start: %s", e + ) + self._logger.debug("AsyncSpeakStreamClient.start LEAVE") + if self._config.options.get("termination_exception_connect") == "true": + raise + return False + except Exception as e: # pylint: disable=broad-except + self._logger.error( + "WebSocketException in AsyncSpeakStreamClient.start: %s", e + ) + self._logger.debug("AsyncSpeakStreamClient.start LEAVE") + if self._config.options.get("termination_exception_connect") == "true": + raise + return False + + # pylint: enable=too-many-branches,too-many-statements + + async def is_connected(self) -> bool: + """ + Returns the connection status of the WebSocket. + """ + return self._socket is not None + + def on(self, event: SpeakWebSocketEvents, handler) -> None: + """ + Registers event handlers for specific events. + """ + self._logger.info("event subscribed: %s", event) + if event in SpeakWebSocketEvents.__members__.values() and callable(handler): + if handler not in self._event_handlers[event]: + self._event_handlers[event].append(handler) + + # triggers the registered event handlers for a specific event + async def _emit(self, event: SpeakWebSocketEvents, *args, **kwargs) -> None: + """ + Emits events to the registered event handlers. + """ + self._logger.debug("AsyncSpeakStreamClient._emit ENTER") + self._logger.debug("callback handlers for: %s", event) + + # debug the threads + for thread in threading.enumerate(): + self._logger.debug("after running thread: %s", thread.name) + self._logger.debug("number of active threads: %s", threading.active_count()) + + tasks = [] + for handler in self._event_handlers[event]: + task = asyncio.create_task(handler(self, *args, **kwargs)) + tasks.append(task) + + if tasks: + self._logger.debug("waiting for tasks to finish...") + await asyncio.gather(*tasks, return_exceptions=True) + tasks.clear() + + # debug the threads + for thread in threading.enumerate(): + self._logger.debug("after running thread: %s", thread.name) + self._logger.debug("number of active threads: %s", threading.active_count()) + + self._logger.debug("AsyncSpeakStreamClient._emit LEAVE") + + # pylint: disable=too-many-return-statements,too-many-statements,too-many-locals,too-many-branches + async def _listening(self) -> None: + """ + Listens for messages from the WebSocket connection. + """ + self._logger.debug("AsyncSpeakStreamClient._listening ENTER") + + while True: + try: + if self._exit_event.is_set(): + self._logger.notice("_listening exiting gracefully") + self._logger.debug("AsyncSpeakStreamClient._listening LEAVE") + return + + if self._socket is None: + self._logger.warning("socket is empty") + self._logger.debug("AsyncSpeakStreamClient._listening LEAVE") + return + + message = await self._socket.recv() + + if message is None: + self._logger.spam("message is None") + continue + + if isinstance(message, bytes): + self._logger.debug("Binary data received") + + # auto flush + if self._config.is_inspecting_speak(): + inspect_res = await self._inspect() + if not inspect_res: + self._logger.error("inspect_res failed") + + await self._emit( + SpeakWebSocketEvents(SpeakWebSocketEvents.AudioData), + data=message, + **dict(cast(Dict[Any, Any], self._kwargs)), + ) + else: + data = json.loads(message) + response_type = data.get("type") + self._logger.debug( + "response_type: %s, data: %s", response_type, data + ) + + match response_type: + case SpeakWebSocketEvents.Open: + open_result: OpenResponse = OpenResponse.from_json(message) + self._logger.verbose("OpenResponse: %s", open_result) + await self._emit( + SpeakWebSocketEvents(SpeakWebSocketEvents.Open), + open=open_result, + **dict(cast(Dict[Any, Any], self._kwargs)), + ) + case SpeakWebSocketEvents.Metadata: + meta_result: MetadataResponse = MetadataResponse.from_json( + message + ) + self._logger.verbose("MetadataResponse: %s", meta_result) + await self._emit( + SpeakWebSocketEvents(SpeakWebSocketEvents.Metadata), + metadata=meta_result, + **dict(cast(Dict[Any, Any], self._kwargs)), + ) + case SpeakWebSocketEvents.Flush: + fl_result: FlushedResponse = FlushedResponse.from_json( + message + ) + self._logger.verbose("FlushedResponse: %s", fl_result) + + # auto flush + if self._config.is_inspecting_speak(): + self._flush_count -= 1 + self._logger.debug( + "Decrement AutoFlush count: %d", + self._flush_count, + ) + + await self._emit( + SpeakWebSocketEvents(SpeakWebSocketEvents.Flush), + flushed=fl_result, + **dict(cast(Dict[Any, Any], self._kwargs)), + ) + case SpeakWebSocketEvents.Close: + close_result: CloseResponse = CloseResponse.from_json( + message + ) + self._logger.verbose("CloseResponse: %s", close_result) + await self._emit( + SpeakWebSocketEvents(SpeakWebSocketEvents.Close), + close=close_result, + **dict(cast(Dict[Any, Any], self._kwargs)), + ) + case SpeakWebSocketEvents.Warning: + war_warning: WarningResponse = WarningResponse.from_json( + message + ) + self._logger.verbose("WarningResponse: %s", war_warning) + await self._emit( + SpeakWebSocketEvents(SpeakWebSocketEvents.Warning), + warning=war_warning, + **dict(cast(Dict[Any, Any], self._kwargs)), + ) + case SpeakWebSocketEvents.Error: + err_error: ErrorResponse = ErrorResponse.from_json(message) + self._logger.verbose("ErrorResponse: %s", err_error) + await self._emit( + SpeakWebSocketEvents(SpeakWebSocketEvents.Error), + error=err_error, + **dict(cast(Dict[Any, Any], self._kwargs)), + ) + case _: + self._logger.warning( + "Unknown Message: response_type: %s, data: %s", + response_type, + data, + ) + unhandled_error: UnhandledResponse = UnhandledResponse( + type=SpeakWebSocketEvents( + SpeakWebSocketEvents.Unhandled + ), + raw=message, + ) + await self._emit( + SpeakWebSocketEvents(SpeakWebSocketEvents.Unhandled), + unhandled=unhandled_error, + **dict(cast(Dict[Any, Any], self._kwargs)), + ) + + except websockets.exceptions.ConnectionClosedOK as e: + self._logger.notice(f"_listening({e.code}) exiting gracefully") + self._logger.debug("AsyncSpeakStreamClient._listening LEAVE") + return + + except websockets.exceptions.ConnectionClosed as e: + if e.code in [1000, 1001]: + self._logger.notice(f"_listening({e.code}) exiting gracefully") + self._logger.debug("AsyncSpeakStreamClient._listening LEAVE") + return + + # no need to call self._signal_exit() here because we are already closed + # note: this is different than the listen websocket client + self._logger.notice( + "ConnectionClosed in AsyncSpeakStreamClient._listening with code %s: %s", + e.code, + e.reason, + ) + self._logger.debug("AsyncSpeakStreamClient._listening LEAVE") + + if self._config.options.get("termination_exception") == "true": + raise + return + + except websockets.exceptions.WebSocketException as e: + self._logger.error( + "WebSocketException in AsyncSpeakStreamClient._listening: %s", e + ) + ws_error: ErrorResponse = ErrorResponse( + "WebSocketException in AsyncSpeakStreamClient._listening", + f"{e}", + "WebSocketException", + ) + await self._emit( + SpeakWebSocketEvents(SpeakWebSocketEvents.Error), + error=ws_error, + **dict(cast(Dict[Any, Any], self._kwargs)), + ) + + # signal exit and close + await self._signal_exit() + + self._logger.debug("AsyncSpeakStreamClient._listening LEAVE") + + if self._config.options.get("termination_exception") == "true": + raise + return + + except Exception as e: # pylint: disable=broad-except + self._logger.error( + "Exception in AsyncSpeakStreamClient._listening: %s", e + ) + e_error: ErrorResponse = ErrorResponse( + "Exception in AsyncSpeakStreamClient._listening", + f"{e}", + "Exception", + ) + await self._emit( + SpeakWebSocketEvents(SpeakWebSocketEvents.Error), + error=e_error, + **dict(cast(Dict[Any, Any], self._kwargs)), + ) + + # signal exit and close + await self._signal_exit() + + self._logger.debug("AsyncSpeakStreamClient._listening LEAVE") + + if self._config.options.get("termination_exception") == "true": + raise + return + + # pylint: disable=too-many-return-statements + + ## pylint: disable=too-many-return-statements,too-many-statements + async def _flush(self) -> None: + self._logger.debug("AsyncSpeakStreamClient._flush ENTER") + + delta_in_ms_str = self._config.options.get("auto_flush_speak_delta") + if delta_in_ms_str is None: + self._logger.error("auto_flush_speak_delta is None") + self._logger.debug("AsyncSpeakStreamClient._flush LEAVE") + return + delta_in_ms = float(delta_in_ms_str) + + while True: + try: + await asyncio.sleep(HALF_SECOND) + + if self._exit_event.is_set(): + self._logger.notice("_flush exiting gracefully") + self._logger.debug("AsyncSpeakStreamClient._flush LEAVE") + return + + if self._socket is None: + self._logger.notice("socket is None, exiting flush") + self._logger.debug("AsyncSpeakStreamClient._flush LEAVE") + return + + if self._last_datagram is None: + self._logger.debug("AutoFlush last_datagram is None") + continue + + delta = datetime.now() - self._last_datagram + diff_in_ms = delta.total_seconds() * 1000 + self._logger.debug("AutoFlush delta: %f", diff_in_ms) + if diff_in_ms < delta_in_ms: + self._logger.debug("AutoFlush delta is less than threshold") + continue + + await self.flush() + + except websockets.exceptions.ConnectionClosedOK as e: + self._logger.notice(f"_flush({e.code}) exiting gracefully") + self._logger.debug("AsyncSpeakStreamClient._flush LEAVE") + return + + except websockets.exceptions.ConnectionClosed as e: + if e.code in [1000, 1001]: + self._logger.notice(f"_flush({e.code}) exiting gracefully") + self._logger.debug("AsyncSpeakStreamClient._flush LEAVE") + return + + # no need to call self._signal_exit() here because we are already closed + # note: this is different than the listen websocket client + self._logger.notice( + "ConnectionClosed in AsyncSpeakStreamClient._listening with code %s: %s", + e.code, + e.reason, + ) + self._logger.debug("AsyncSpeakStreamClient._flush LEAVE") + + if self._config.options.get("termination_exception") == "true": + raise + return + + except websockets.exceptions.WebSocketException as e: + self._logger.error( + "WebSocketException in AsyncSpeakStreamClient._flush: %s", e + ) + ws_error: ErrorResponse = ErrorResponse( + "WebSocketException in AsyncSpeakStreamClient._flush", + f"{e}", + "Exception", + ) + await self._emit( + SpeakWebSocketEvents(SpeakWebSocketEvents.Error), + error=ws_error, + **dict(cast(Dict[Any, Any], self._kwargs)), + ) + + # signal exit and close + await self._signal_exit() + + self._logger.debug("AsyncSpeakStreamClient._flush LEAVE") + + if self._config.options.get("termination_exception") == "true": + raise + return + + except Exception as e: # pylint: disable=broad-except + self._logger.error("Exception in AsyncSpeakStreamClient._flush: %s", e) + e_error: ErrorResponse = ErrorResponse( + "Exception in AsyncSpeakStreamClient._flush", + f"{e}", + "Exception", + ) + self._logger.error( + "Exception in AsyncSpeakStreamClient._flush: %s", str(e) + ) + await self._emit( + SpeakWebSocketEvents(SpeakWebSocketEvents.Error), + error=e_error, + **dict(cast(Dict[Any, Any], self._kwargs)), + ) + + # signal exit and close + await self._signal_exit() + + self._logger.debug("AsyncSpeakStreamClient._flush LEAVE") + + if self._config.options.get("termination_exception") == "true": + raise + return + + # pylint: enable=too-many-return-statements + + async def send_text(self, text_input: str) -> bool: + """ + Sends text to the WebSocket connection to generate audio. + + Args: + text_input (str): The raw text to be synthesized. This function will automatically wrap + the text in a JSON object of type "Speak" with the key "text". + + Returns: + bool: True if the text was successfully sent, False otherwise. + """ + return await self.send_raw(json.dumps({"type": "Speak", "text": text_input})) + + async def send(self, text_input: str) -> bool: + """ + Alias for send_text. Please see send_text for more information. + """ + return await self.send_text(text_input) + + # pylint: disable=unused-argument + async def send_control( + self, msg_type: Union[SpeakWebSocketMessage, str], data: Optional[str] = "" + ) -> bool: + """ + Sends a control message consisting of type SpeakWebSocketEvents over the WebSocket connection. + + Args: + msg_type (SpeakWebSocketEvents): The type of control message to send. + (Optional) data (str): The data to send with the control message. + + Returns: + bool: True if the control message was successfully sent, False otherwise. + """ + control_msg = json.dumps({"type": msg_type}) + return await self.send_raw(control_msg) + + # pylint: enable=unused-argument + + # pylint: disable=too-many-return-statements,too-many-branches,too-many-statements + async def send_raw(self, msg: str) -> bool: + """ + Sends a raw/control message over the WebSocket connection. This message must contain a valid JSON object. + + Args: + msg (str): The raw message to send over the WebSocket connection. + + Returns: + bool: True if the message was successfully sent, False otherwise. + """ + self._logger.spam("AsyncSpeakStreamClient.send_raw ENTER") + + if self._exit_event.is_set(): + self._logger.notice("send_raw exiting gracefully") + self._logger.debug("AsyncSpeakStreamClient.send_raw LEAVE") + return False + + if not await self.is_connected(): + self._logger.notice("is_connected is False") + self._logger.debug("AsyncListenWebSocketClient.send LEAVE") + return False + + if self._config.is_inspecting_speak(): + try: + _tmp_json = json.loads(msg) + if "type" in _tmp_json and _tmp_json["type"] == "Flush": + self._last_datagram = None + self._flush_count += 1 + self._logger.debug("Increment Flush count: %d", self._flush_count) + except Exception as e: # pylint: disable=broad-except + self._logger.error("send_raw() failed - Exception: %s", str(e)) + + if self._socket is not None: + try: + await self._socket.send(msg) + except websockets.exceptions.ConnectionClosedOK as e: + self._logger.notice(f"send_raw() exiting gracefully: {e.code}") + self._logger.debug("AsyncSpeakStreamClient.send_raw LEAVE") + if self._config.options.get("termination_exception_send") == "true": + raise + return True + except websockets.exceptions.ConnectionClosed as e: + if e.code in [1000, 1001]: + self._logger.notice(f"send_raw({e.code}) exiting gracefully") + self._logger.debug("AsyncSpeakStreamClient.send_raw LEAVE") + if self._config.options.get("termination_exception_send") == "true": + raise + return True + + self._logger.error("send_raw() failed - ConnectionClosed: %s", str(e)) + self._logger.spam("AsyncSpeakStreamClient.send_raw LEAVE") + if self._config.options.get("termination_exception_send") == "true": + raise + return False + except websockets.exceptions.WebSocketException as e: + self._logger.error("send_raw() failed - WebSocketException: %s", str(e)) + self._logger.spam("AsyncSpeakStreamClient.send_raw LEAVE") + if self._config.options.get("termination_exception_send") == "true": + raise + return False + except Exception as e: # pylint: disable=broad-except + self._logger.error("send_raw() failed - Exception: %s", str(e)) + self._logger.spam("AsyncSpeakStreamClient.send_raw LEAVE") + if self._config.options.get("termination_exception_send") == "true": + raise + return False + + self._logger.spam("send_raw() succeeded") + self._logger.spam("AsyncSpeakStreamClient.send_raw LEAVE") + return True + + self._logger.spam("send_raw() failed. socket is None") + self._logger.spam("AsyncSpeakStreamClient.send_raw LEAVE") + return False + + # pylint: enable=too-many-return-statements,too-many-branches + + async def flush(self) -> bool: + """ + Flushes the current buffer and returns generated audio + """ + self._logger.spam("AsyncSpeakStreamClient.flush ENTER") + + if self._exit_event.is_set(): + self._logger.notice("flush exiting gracefully") + self._logger.debug("AsyncSpeakStreamClient.flush LEAVE") + return False + + if self._socket is None: + self._logger.notice("socket is not intialized") + self._logger.debug("AsyncSpeakStreamClient.flush LEAVE") + return False + + self._logger.notice("Sending Flush...") + ret = await self.send_control(SpeakWebSocketMessage.Flush) + + if not ret: + self._logger.error("flush failed") + self._logger.spam("AsyncSpeakStreamClient.flush LEAVE") + return False + + self._logger.notice("flush succeeded") + self._logger.spam("AsyncSpeakStreamClient.flush LEAVE") + + return True + + async def finish(self) -> bool: + """ + Closes the WebSocket connection gracefully. + """ + self._logger.debug("AsyncSpeakStreamClient.finish ENTER") + + # signal exit + await self._signal_exit() + + # stop the threads + self._logger.verbose("cancelling tasks...") + try: + # Before cancelling, check if the tasks were created + # debug the threads + for thread in threading.enumerate(): + self._logger.debug("before running thread: %s", thread.name) + self._logger.debug("number of active threads: %s", threading.active_count()) + + tasks = [] + + if self._flush_thread is not None: + self._flush_thread.cancel() + tasks.append(self._flush_thread) + self._logger.notice("processing _flush_thread cancel...") + + if self._listen_thread is not None: + self._listen_thread.cancel() + tasks.append(self._listen_thread) + self._logger.notice("processing _listen_thread cancel...") + + # Use asyncio.gather to wait for tasks to be cancelled + await asyncio.wait_for( + asyncio.gather(*tasks), timeout=10 + ) # Prevent indefinite waiting + self._logger.notice("threads joined") + + # debug the threads + for thread in threading.enumerate(): + self._logger.debug("after running thread: %s", thread.name) + self._logger.debug("number of active threads: %s", threading.active_count()) + + self._logger.notice("finish succeeded") + self._logger.spam("AsyncSpeakStreamClient.finish LEAVE") + return True + + except asyncio.CancelledError as e: + self._logger.error("tasks cancelled error: %s", e) + self._logger.debug("AsyncSpeakStreamClient.finish LEAVE") + return False + + except asyncio.TimeoutError as e: + self._logger.error("tasks cancellation timed out: %s", e) + self._logger.debug("AsyncSpeakStreamClient.finish LEAVE") + return False + + async def _signal_exit(self) -> None: + # send close event + self._logger.verbose("closing socket...") + if self._socket is not None: + self._logger.verbose("send CloseStream...") + try: + # if the socket connection is closed, the following line might throw an error + # need to explicitly use _socket.send (dont use self.send_raw) + await self._socket.send(json.dumps({"type": "CloseStream"})) + except websockets.exceptions.ConnectionClosedOK as e: + self._logger.notice("_signal_exit - ConnectionClosedOK: %s", e.code) + except websockets.exceptions.ConnectionClosed as e: + self._logger.error("_signal_exit - ConnectionClosed: %s", e.code) + except websockets.exceptions.WebSocketException as e: + self._logger.error("_signal_exit - WebSocketException: %s", str(e)) + except Exception as e: # pylint: disable=broad-except + self._logger.error("_signal_exit - Exception: %s", str(e)) + + # push close event + try: + await self._emit( + SpeakWebSocketEvents(SpeakWebSocketEvents.Close), + close=CloseResponse(type=SpeakWebSocketEvents.Close), + **dict(cast(Dict[Any, Any], self._kwargs)), + ) + except Exception as e: # pylint: disable=broad-except + self._logger.error("_emit - Exception: %s", e) + + # wait for task to send + await asyncio.sleep(0.5) + + # signal exit + self._exit_event.set() + + # closes the WebSocket connection gracefully + self._logger.verbose("clean up socket...") + if self._socket is not None: + self._logger.verbose("socket.wait_closed...") + try: + await self._socket.close() + except websockets.exceptions.WebSocketException as e: + self._logger.error("socket.wait_closed failed: %s", e) + + self._socket = None # type: ignore + + async def _inspect(self) -> bool: + # auto flush_inspect is generically used to track any messages you might want to snoop on + # place additional logic here to inspect messages of interest + + # for auto flush functionality + # set the last datagram + self._last_datagram = datetime.now() + self._logger.debug( + "AutoFlush last received: %s", + str(self._last_datagram), + ) + + return True diff --git a/deepgram/clients/speak/v1/websocket/client.py b/deepgram/clients/speak/v1/websocket/client.py new file mode 100644 index 00000000..9c9af4f6 --- /dev/null +++ b/deepgram/clients/speak/v1/websocket/client.py @@ -0,0 +1,801 @@ +# Copyright 2024 Deepgram SDK contributors. All Rights Reserved. +# Use of this source code is governed by a MIT license that can be found in the LICENSE file. +# SPDX-License-Identifier: MIT + +import json +import time +import logging +from typing import Dict, Union, Optional, cast, Any +from datetime import datetime +import threading + +from websockets.sync.client import connect, ClientConnection +import websockets + +from .....utils import verboselogs +from .....options import DeepgramClientOptions +from ...enums import SpeakWebSocketEvents, SpeakWebSocketMessage +from .helpers import convert_to_websocket_url, append_query_params +from ...errors import DeepgramError + +from .response import ( + OpenResponse, + MetadataResponse, + FlushedResponse, + CloseResponse, + WarningResponse, + ErrorResponse, + UnhandledResponse, +) +from .options import SpeakWebSocketOptions + +ONE_SECOND = 1 +HALF_SECOND = 0.5 +DEEPGRAM_INTERVAL = 5 +PING_INTERVAL = 20 + + +class SpeakWebSocketClient: # pylint: disable=too-many-instance-attributes + """ + Client for interacting with Deepgram's text-to-speech services over WebSockets. + + This class provides methods to establish a WebSocket connection for TTS synthesis and handle real-time TTS synthesis events. + + Args: + config (DeepgramClientOptions): all the options for the client. + """ + + _logger: verboselogs.VerboseLogger + _config: DeepgramClientOptions + _endpoint: str + _websocket_url: str + + _socket: ClientConnection + _exit_event: threading.Event + _lock_send: threading.Lock + _event_handlers: Dict[SpeakWebSocketEvents, list] + + _listen_thread: Union[threading.Thread, None] + _flush_thread: Union[threading.Thread, None] + _lock_flush: threading.Lock + _last_datagram: Optional[datetime] = None + _flush_count: int + + _kwargs: Optional[Dict] = None + _addons: Optional[Dict] = None + _options: Optional[Dict] = None + _headers: Optional[Dict] = None + + def __init__(self, config: DeepgramClientOptions): + if config is None: + raise DeepgramError("Config are required") + + self._logger = verboselogs.VerboseLogger(__name__) + self._logger.addHandler(logging.StreamHandler()) + self._logger.setLevel(config.verbose) + + self._config = config + self._endpoint = "v1/speak" + self._lock_send = threading.Lock() + self._lock_flush = threading.Lock() + + self._listen_thread = None + self._flush_thread = None + + # exit + self._exit_event = threading.Event() + + # flush + self._last_datagram = None + self._flush_count = 0 + + # init handlers + self._event_handlers = { + event: [] for event in SpeakWebSocketEvents.__members__.values() + } + self._websocket_url = convert_to_websocket_url(self._config.url, self._endpoint) + + # pylint: disable=too-many-statements,too-many-branches + def start( + self, + options: Optional[Union[SpeakWebSocketOptions, Dict]] = None, + addons: Optional[Dict] = None, + headers: Optional[Dict] = None, + members: Optional[Dict] = None, + **kwargs, + ) -> bool: + """ + Starts the WebSocket connection for text-to-speech synthesis. + """ + self._logger.debug("SpeakStreamClient.start ENTER") + self._logger.info("options: %s", options) + self._logger.info("addons: %s", addons) + self._logger.info("headers: %s", headers) + self._logger.info("members: %s", members) + self._logger.info("kwargs: %s", kwargs) + + if isinstance(options, SpeakWebSocketOptions) and not options.check(): + self._logger.error("options.check failed") + self._logger.debug("SpeakStreamClient.start LEAVE") + raise DeepgramError("Fatal text-to-speech options error") + + self._addons = addons + self._headers = headers + + # add "members" as members of the class + if members is not None: + self.__dict__.update(members) + + # set kwargs as members of the class + if kwargs is not None: + self._kwargs = kwargs + else: + self._kwargs = {} + + if isinstance(options, SpeakWebSocketOptions): + self._logger.info("SpeakWebSocketOptions switching class -> dict") + self._options = options.to_dict() + elif options is not None: + self._options = options + else: + self._options = {} + + combined_options = self._options + if self._addons is not None: + self._logger.info("merging addons to options") + combined_options.update(self._addons) + self._logger.info("new options: %s", combined_options) + self._logger.debug("combined_options: %s", combined_options) + + combined_headers = self._config.headers + if self._headers is not None: + self._logger.info("merging headers to options") + combined_headers.update(self._headers) + self._logger.info("new headers: %s", combined_headers) + self._logger.debug("combined_headers: %s", combined_headers) + + url_with_params = append_query_params(self._websocket_url, combined_options) + try: + self._socket = connect(url_with_params, additional_headers=combined_headers) + self._exit_event.clear() + + # debug the threads + for thread in threading.enumerate(): + self._logger.debug("after running thread: %s", thread.name) + self._logger.debug("number of active threads: %s", threading.active_count()) + + # listening thread + self._listen_thread = threading.Thread(target=self._listening) + self._listen_thread.start() + + # flush thread + if self._config.is_auto_flush_speak_enabled(): + self._logger.notice("autoflush is enabled") + self._flush_thread = threading.Thread(target=self._flush) + self._flush_thread.start() + else: + self._logger.notice("autoflush is disabled") + + # debug the threads + for thread in threading.enumerate(): + self._logger.debug("after running thread: %s", thread.name) + self._logger.debug("number of active threads: %s", threading.active_count()) + + # push open event + self._emit( + SpeakWebSocketEvents(SpeakWebSocketEvents.Open), + OpenResponse(type=SpeakWebSocketEvents.Open), + ) + + self._logger.notice("start succeeded") + self._logger.debug("SpeakStreamClient.start LEAVE") + return True + except websockets.ConnectionClosed as e: + self._logger.error("ConnectionClosed in SpeakStreamClient.start: %s", e) + self._logger.debug("SpeakStreamClient.start LEAVE") + if self._config.options.get("termination_exception_connect") == "true": + raise e + return False + except websockets.exceptions.WebSocketException as e: + self._logger.error("WebSocketException in SpeakStreamClient.start: %s", e) + self._logger.debug("SpeakStreamClient.start LEAVE") + if self._config.options.get("termination_exception_connect") == "true": + raise e + return False + except Exception as e: # pylint: disable=broad-except + self._logger.error("WebSocketException in SpeakStreamClient.start: %s", e) + self._logger.debug("SpeakStreamClient.start LEAVE") + if self._config.options.get("termination_exception_connect") == "true": + raise e + return False + + def is_connected(self) -> bool: + """ + Returns the connection status of the WebSocket. + """ + return self._socket is not None + + # pylint: enable=too-many-statements,too-many-branches + + def on( + self, event: SpeakWebSocketEvents, handler + ) -> None: # registers event handlers for specific events + """ + Registers event handlers for specific events. + """ + self._logger.info("event subscribed: %s", event) + if event in SpeakWebSocketEvents.__members__.values() and callable(handler): + self._event_handlers[event].append(handler) + + def _emit(self, event: SpeakWebSocketEvents, *args, **kwargs) -> None: + """ + Emits events to the registered event handlers. + """ + self._logger.debug("callback handlers for: %s", event) + for handler in self._event_handlers[event]: + handler(self, *args, **kwargs) + + # pylint: disable=too-many-return-statements,too-many-statements,too-many-locals,too-many-branches + def _listening( + self, + ) -> None: + """ + Listens for messages from the WebSocket connection. + """ + self._logger.debug("SpeakStreamClient._listening ENTER") + + while True: + try: + if self._exit_event.is_set(): + self._logger.notice("_listening exiting gracefully") + self._logger.debug("SpeakStreamClient._listening LEAVE") + return + + if self._socket is None: + self._logger.warning("socket is empty") + self._logger.debug("SpeakStreamClient._listening LEAVE") + return + + message = self._socket.recv() + + if message is None: + self._logger.info("message is empty") + continue + + if isinstance(message, bytes): + self._logger.debug("Binary data received") + + # auto flush + if self._config.is_inspecting_speak(): + inspect_res = self._inspect() + if not inspect_res: + self._logger.error("inspect_res failed") + + self._emit( + SpeakWebSocketEvents(SpeakWebSocketEvents.AudioData), + data=message, + **dict(cast(Dict[Any, Any], self._kwargs)), + ) + else: + data = json.loads(message) + response_type = data.get("type") + self._logger.debug( + "response_type: %s, data: %s", response_type, data + ) + + match response_type: + case SpeakWebSocketEvents.Open: + open_result: OpenResponse = OpenResponse.from_json(message) + self._logger.verbose("OpenResponse: %s", open_result) + self._emit( + SpeakWebSocketEvents(SpeakWebSocketEvents.Open), + open=open_result, + **dict(cast(Dict[Any, Any], self._kwargs)), + ) + case SpeakWebSocketEvents.Metadata: + meta_result: MetadataResponse = MetadataResponse.from_json( + message + ) + self._logger.verbose("MetadataResponse: %s", meta_result) + self._emit( + SpeakWebSocketEvents(SpeakWebSocketEvents.Metadata), + metadata=meta_result, + **dict(cast(Dict[Any, Any], self._kwargs)), + ) + case SpeakWebSocketEvents.Flush: + fl_result: FlushedResponse = FlushedResponse.from_json( + message + ) + self._logger.verbose("FlushedResponse: %s", fl_result) + + # auto flush + if self._config.is_inspecting_speak(): + with self._lock_flush: + self._flush_count -= 1 + self._logger.debug( + "Decrement Flush count: %d", + self._flush_count, + ) + + self._emit( + SpeakWebSocketEvents(SpeakWebSocketEvents.Flush), + flushed=fl_result, + **dict(cast(Dict[Any, Any], self._kwargs)), + ) + case SpeakWebSocketEvents.Close: + close_result: CloseResponse = CloseResponse.from_json( + message + ) + self._logger.verbose("CloseResponse: %s", close_result) + self._emit( + SpeakWebSocketEvents(SpeakWebSocketEvents.Close), + close=close_result, + **dict(cast(Dict[Any, Any], self._kwargs)), + ) + case SpeakWebSocketEvents.Warning: + war_warning: WarningResponse = WarningResponse.from_json( + message + ) + self._logger.verbose("WarningResponse: %s", war_warning) + self._emit( + SpeakWebSocketEvents(SpeakWebSocketEvents.Warning), + warning=war_warning, + **dict(cast(Dict[Any, Any], self._kwargs)), + ) + case SpeakWebSocketEvents.Error: + err_error: ErrorResponse = ErrorResponse.from_json(message) + self._logger.verbose("ErrorResponse: %s", err_error) + self._emit( + SpeakWebSocketEvents(SpeakWebSocketEvents.Error), + error=err_error, + **dict(cast(Dict[Any, Any], self._kwargs)), + ) + case _: + self._logger.warning( + "Unknown Message: response_type: %s, data: %s", + response_type, + data, + ) + unhandled_error: UnhandledResponse = UnhandledResponse( + type=SpeakWebSocketEvents( + SpeakWebSocketEvents.Unhandled + ), + raw=message, + ) + self._emit( + SpeakWebSocketEvents(SpeakWebSocketEvents.Unhandled), + unhandled=unhandled_error, + **dict(cast(Dict[Any, Any], self._kwargs)), + ) + + except websockets.exceptions.ConnectionClosedOK as e: + self._logger.notice(f"_listening({e.code}) exiting gracefully") + self._logger.debug("SpeakStreamClient._listening LEAVE") + return + + except websockets.exceptions.ConnectionClosed as e: + if e.code in [1000, 1001]: + self._logger.notice(f"_listening({e.code}) exiting gracefully") + self._logger.debug("SpeakStreamClient._listening LEAVE") + return + + # no need to call self._signal_exit() here because we are already closed + # note: this is different than the listen websocket client + self._logger.notice( + "ConnectionClosed in SpeakStreamClient._listening with code %s: %s", + e.code, + e.reason, + ) + self._logger.debug("SpeakStreamClient._listening LEAVE") + + if self._config.options.get("termination_exception") == "true": + raise + return + + except websockets.exceptions.WebSocketException as e: + self._logger.error( + "WebSocketException in SpeakStreamClient._listening with: %s", e + ) + ws_error: ErrorResponse = ErrorResponse( + "WebSocketException in SpeakStreamClient._listening", + f"{e}", + "WebSocketException", + ) + self._emit(SpeakWebSocketEvents(SpeakWebSocketEvents.Error), ws_error) + + # signal exit and close + self._signal_exit() + + self._logger.debug("SpeakStreamClient._listening LEAVE") + + if self._config.options.get("termination_exception") == "true": + raise + return + + except Exception as e: # pylint: disable=broad-except + self._logger.error("Exception in SpeakStreamClient._listening: %s", e) + e_error: ErrorResponse = ErrorResponse( + "Exception in SpeakStreamClient._listening", + f"{e}", + "Exception", + ) + self._logger.error( + "Exception in SpeakStreamClient._listening: %s", str(e) + ) + self._emit(SpeakWebSocketEvents(SpeakWebSocketEvents.Error), e_error) + + # signal exit and close + self._signal_exit() + + self._logger.debug("SpeakStreamClient._listening LEAVE") + + if self._config.options.get("termination_exception") == "true": + raise + return + + # pylint: enable=too-many-return-statements,too-many-statements,too-many-locals,too-many-branches + + # pylint: disable=too-many-return-statements,too-many-statements,too-many-branches + def _flush(self) -> None: + self._logger.debug("SpeakStreamClient._flush ENTER") + + delta_in_ms_str = self._config.options.get("auto_flush_speak_delta") + if delta_in_ms_str is None: + self._logger.error("auto_flush_speak_delta is None") + self._logger.debug("SpeakStreamClient._flush LEAVE") + return + delta_in_ms = float(delta_in_ms_str) + + counter = 0 + while True: + try: + counter += 1 + self._exit_event.wait(timeout=HALF_SECOND) + + if self._exit_event.is_set(): + self._logger.notice("_flush exiting gracefully") + self._logger.debug("ListenWebSocketClient._flush LEAVE") + return + + if self._socket is None: + self._logger.notice("socket is None, exiting keep_alive") + self._logger.debug("ListenWebSocketClient._flush LEAVE") + return + + if self._last_datagram is None: + self._logger.debug("AutoFlush last_datagram is None") + continue + + with self._lock_flush: + delta = datetime.now() - self._last_datagram + diff_in_ms = delta.total_seconds() * 1000 + self._logger.debug("AutoFlush delta: %f", diff_in_ms) + if diff_in_ms < delta_in_ms: + self._logger.debug("AutoFlush delta is less than threshold") + continue + + self.flush() + + except websockets.exceptions.ConnectionClosedOK as e: + self._logger.notice(f"_flush({e.code}) exiting gracefully") + self._logger.debug("SpeakStreamClient._flush LEAVE") + return + + except websockets.exceptions.ConnectionClosed as e: + if e.code in [1000, 1001]: + self._logger.notice(f"_flush({e.code}) exiting gracefully") + self._logger.debug("SpeakStreamClient._flush LEAVE") + return + + # no need to call self._signal_exit() here because we are already closed + # note: this is different than the listen websocket client + self._logger.notice( + "ConnectionClosed in SpeakStreamClient._flush with code %s: %s", + e.code, + e.reason, + ) + self._logger.debug("SpeakStreamClient._flush LEAVE") + + if self._config.options.get("termination_exception") == "true": + raise + return + + except websockets.exceptions.WebSocketException as e: + self._logger.error( + "WebSocketException in SpeakStreamClient._flush: %s", e + ) + ws_error: ErrorResponse = ErrorResponse( + "WebSocketException in SpeakStreamClient._flush", + f"{e}", + "Exception", + ) + self._emit( + SpeakWebSocketEvents(SpeakWebSocketEvents.Error), + error=ws_error, + **dict(cast(Dict[Any, Any], self._kwargs)), + ) + + # signal exit and close + self._signal_exit() + + self._logger.debug("SpeakStreamClient._flush LEAVE") + + if self._config.options.get("termination_exception") == "true": + raise + return + + except Exception as e: # pylint: disable=broad-except + self._logger.error("Exception in SpeakStreamClient._flush: %s", e) + e_error: ErrorResponse = ErrorResponse( + "Exception in SpeakStreamClient._flush", + f"{e}", + "Exception", + ) + self._logger.error("Exception in SpeakStreamClient._flush: %s", str(e)) + self._emit( + SpeakWebSocketEvents(SpeakWebSocketEvents.Error), + error=e_error, + **dict(cast(Dict[Any, Any], self._kwargs)), + ) + + # signal exit and close + self._signal_exit() + + self._logger.debug("SpeakStreamClient._flush LEAVE") + + if self._config.options.get("termination_exception") == "true": + raise + return + + # pylint: enable=too-many-return-statements,too-many-statements,too-many-branches + + def send_text(self, text_input: str) -> bool: + """ + Sends text to the WebSocket connection to generate audio. + + Args: + text_input (str): The raw text to be synthesized. This function will automatically wrap + the text in a JSON object of type "Speak" with the key "text". + + Returns: + bool: True if the text was successfully sent, False otherwise. + """ + return self.send_raw(json.dumps({"type": "Speak", "text": text_input})) + + def send(self, text_input: str) -> bool: + """ + Alias for send_text. Please see send_text for more information. + """ + return self.send_text(text_input) + + # pylint: disable=unused-argument + def send_control( + self, msg_type: Union[SpeakWebSocketMessage, str], data: Optional[str] = "" + ) -> bool: + """ + Sends a control message consisting of type SpeakWebSocketEvents over the WebSocket connection. + + Args: + msg_type (SpeakWebSocketEvents): The type of control message to send. + (Optional) data (str): The data to send with the control message. + + Returns: + bool: True if the control message was successfully sent, False otherwise. + """ + control_msg = json.dumps({"type": msg_type}) + return self.send_raw(control_msg) + + # pylint: enable=unused-argument + + # pylint: disable=too-many-return-statements,too-many-branches,too-many-statements + def send_raw(self, msg: str) -> bool: + """ + Sends a raw/control message over the WebSocket connection. This message must contain a valid JSON object. + + Args: + msg (str): The raw message to send over the WebSocket connection. + + Returns: + bool: True if the message was successfully sent, False otherwise. + """ + self._logger.spam("SpeakStreamClient.send_raw ENTER") + + if self._exit_event.is_set(): + self._logger.notice("send exiting gracefully") + self._logger.debug("SpeakStreamClient.send LEAVE") + return False + + if not self.is_connected(): + self._logger.notice("is_connected is False") + self._logger.debug("AsyncListenWebSocketClient.send LEAVE") + return False + + if self._config.is_inspecting_speak(): + try: + _tmp_json = json.loads(msg) + if "type" in _tmp_json and _tmp_json["type"] == "Flush": + with self._lock_flush: + self._last_datagram = None + self._flush_count += 1 + self._logger.debug( + "Increment Flush count: %d", self._flush_count + ) + except Exception as e: # pylint: disable=broad-except + self._logger.error("send_raw() failed - Exception: %s", str(e)) + + if self._socket is not None: + with self._lock_send: + try: + self._socket.send(msg) + except websockets.exceptions.ConnectionClosedOK as e: + self._logger.notice(f"send_raw() exiting gracefully: {e.code}") + self._logger.debug("SpeakStreamClient.send_raw LEAVE") + if self._config.options.get("termination_exception_send") == "true": + raise + return True + except websockets.exceptions.ConnectionClosed as e: + if e.code in [1000, 1001]: + self._logger.notice(f"send_raw({e.code}) exiting gracefully") + self._logger.debug("SpeakStreamClient.send_raw LEAVE") + if ( + self._config.options.get("termination_exception_send") + == "true" + ): + raise + return True + self._logger.error( + "send_raw() failed - ConnectionClosed: %s", str(e) + ) + self._logger.spam("SpeakStreamClient.send_raw LEAVE") + if self._config.options.get("termination_exception_send") == "true": + raise + return False + except websockets.exceptions.WebSocketException as e: + self._logger.error( + "send_raw() failed - WebSocketException: %s", str(e) + ) + self._logger.spam("SpeakStreamClient.send_raw LEAVE") + if self._config.options.get("termination_exception_send") == "true": + raise + return False + except Exception as e: # pylint: disable=broad-except + self._logger.error("send_raw() failed - Exception: %s", str(e)) + self._logger.spam("SpeakStreamClient.send_raw LEAVE") + if self._config.options.get("termination_exception_send") == "true": + raise + return False + + self._logger.spam("send_raw() succeeded") + self._logger.spam("SpeakStreamClient.send_raw LEAVE") + return True + + self._logger.spam("send_raw() failed. socket is None") + self._logger.spam("SpeakStreamClient.send_raw LEAVE") + return False + + # pylint: enable=too-many-return-statements,too-many-branches,too-many-statements + + def flush(self) -> bool: + """ + Flushes the current buffer and returns generated audio + """ + self._logger.spam("SpeakStreamClient.flush ENTER") + + if self._exit_event.is_set(): + self._logger.notice("flush exiting gracefully") + self._logger.debug("SpeakStreamClient.flush LEAVE") + return False + + if self._socket is None: + self._logger.notice("socket is not intialized") + self._logger.debug("SpeakStreamClient.flush LEAVE") + return False + + self._logger.notice("Sending Flush...") + ret = self.send_control(SpeakWebSocketMessage.Flush) + + if not ret: + self._logger.error("flush failed") + self._logger.spam("SpeakStreamClient.flush LEAVE") + return False + + self._logger.notice("flush succeeded") + self._logger.spam("SpeakStreamClient.flush LEAVE") + + return True + + # closes the WebSocket connection gracefully + def finish(self) -> bool: + """ + Closes the WebSocket connection gracefully. + """ + self._logger.spam("SpeakStreamClient.finish ENTER") + + # debug the threads + for thread in threading.enumerate(): + self._logger.debug("before running thread: %s", thread.name) + self._logger.debug("number of active threads: %s", threading.active_count()) + + # signal exit + self._signal_exit() + + # stop the threads + self._logger.verbose("cancelling tasks...") + if self._flush_thread is not None: + self._flush_thread.join() + self._flush_thread = None + self._logger.notice("processing _flush_thread thread joined") + + if self._listen_thread is not None: + self._listen_thread.join() + self._listen_thread = None + self._logger.notice("listening thread joined") + + # debug the threads + for thread in threading.enumerate(): + self._logger.debug("before running thread: %s", thread.name) + self._logger.debug("number of active threads: %s", threading.active_count()) + + self._logger.notice("finish succeeded") + self._logger.spam("SpeakStreamClient.finish LEAVE") + return True + + # signals the WebSocket connection to exit + def _signal_exit(self) -> None: + # closes the WebSocket connection gracefully + self._logger.notice("closing socket...") + if self._socket is not None: + self._logger.notice("sending Close...") + try: + # if the socket connection is closed, the following line might throw an error + # need to explicitly use _socket.send (dont use self.send_raw) + self._socket.send(json.dumps({"type": "CloseStream"})) + except websockets.exceptions.ConnectionClosedOK as e: + self._logger.notice("_signal_exit - ConnectionClosedOK: %s", e.code) + except websockets.exceptions.ConnectionClosed as e: + self._logger.notice("_signal_exit - ConnectionClosed: %s", e.code) + except websockets.exceptions.WebSocketException as e: + self._logger.error("_signal_exit - WebSocketException: %s", str(e)) + except Exception as e: # pylint: disable=broad-except + self._logger.error("_signal_exit - Exception: %s", str(e)) + + # push close event + try: + self._emit( + SpeakWebSocketEvents(SpeakWebSocketEvents.Close), + CloseResponse(type=SpeakWebSocketEvents.Close), + ) + except Exception as e: # pylint: disable=broad-except + self._logger.error("_signal_exit - Exception: %s", e) + + # wait for task to send + time.sleep(0.5) + + # signal exit + self._exit_event.set() + + # closes the WebSocket connection gracefully + self._logger.verbose("clean up socket...") + if self._socket is not None: + self._logger.verbose("socket.wait_closed...") + try: + self._socket.close() + except websockets.exceptions.WebSocketException as e: + self._logger.error("socket.wait_closed failed: %s", e) + + self._socket = None # type: ignore + + def _inspect(self) -> bool: + # auto flush_inspect is generically used to track any messages you might want to snoop on + # place additional logic here to inspect messages of interest + + # for auto flush functionality + # set the last datagram + with self._lock_flush: + self._last_datagram = datetime.now() + self._logger.debug( + "AutoFlush last received: %s", + str(self._last_datagram), + ) + + return True diff --git a/deepgram/clients/speak/v1/websocket/helpers.py b/deepgram/clients/speak/v1/websocket/helpers.py new file mode 100644 index 00000000..dffcdb06 --- /dev/null +++ b/deepgram/clients/speak/v1/websocket/helpers.py @@ -0,0 +1,43 @@ +from urllib.parse import urlparse, urlunparse, parse_qs, urlencode +from typing import Dict, Optional +import re + + +# This function appends query parameters to a URL +def append_query_params(url: str, params: Optional[Dict] = None): + """ + Appends query parameters to a URL + """ + parsed_url = urlparse(url) + query_params = parse_qs(parsed_url.query) + + if params is not None: + for key, value in params.items(): + if value is None: + continue + if isinstance(value, bool): + value = str(value).lower() + if isinstance(value, list): + for item in value: + query_params[key] = query_params.get(key, []) + [str(item)] + else: + query_params[key] = [str(value)] + + updated_query_string = urlencode(query_params, doseq=True) + updated_url = parsed_url._replace(query=updated_query_string).geturl() + return updated_url + + +# This function converts a URL to a WebSocket URL +def convert_to_websocket_url(base_url: str, endpoint: str): + """ + Converts a URL to a WebSocket URL + """ + if re.match(r"^https?://", base_url, re.IGNORECASE): + base_url = base_url.replace("https://", "").replace("http://", "") + if not re.match(r"^wss?://", base_url, re.IGNORECASE): + base_url = "wss://" + base_url + parsed_url = urlparse(base_url) + domain = parsed_url.netloc + websocket_url = urlunparse((parsed_url.scheme, domain, endpoint, "", "", "")) + return websocket_url diff --git a/deepgram/clients/speak/v1/websocket/options.py b/deepgram/clients/speak/v1/websocket/options.py new file mode 100644 index 00000000..a5d78752 --- /dev/null +++ b/deepgram/clients/speak/v1/websocket/options.py @@ -0,0 +1,7 @@ +# Copyright 2024 Deepgram SDK contributors. All Rights Reserved. +# Use of this source code is governed by a MIT license that can be found in the LICENSE file. +# SPDX-License-Identifier: MIT + +from ..options import SpeakOptions + +SpeakWebSocketOptions = SpeakOptions diff --git a/deepgram/clients/speak/v1/websocket/response.py b/deepgram/clients/speak/v1/websocket/response.py new file mode 100644 index 00000000..87f1d466 --- /dev/null +++ b/deepgram/clients/speak/v1/websocket/response.py @@ -0,0 +1,185 @@ +# Copyright 2024 Deepgram SDK contributors. All Rights Reserved. +# Use of this source code is governed by a MIT license that can be found in the LICENSE file. +# SPDX-License-Identifier: MIT + + +from typing import Optional +import io + +from dataclasses import dataclass, field +from dataclasses_json import config as dataclass_config, DataClassJsonMixin + +# Speak Response Types: + + +@dataclass +class SpeakWebSocketResponse( + DataClassJsonMixin +): # pylint: disable=too-many-instance-attributes + """ + A class for representing a response from the speak (streaming) endpoint. + """ + + content_type: str = "" + request_id: str = "" + model_uuid: str = "" + model_name: str = "" + date: str = "" + stream: Optional[io.BytesIO] = field( + default=None, metadata=dataclass_config(exclude=lambda f: f is None) + ) + + def __getitem__(self, key): + _dict = self.to_dict() + return _dict[key] + + def __setitem__(self, key, val): + self.__dict__[key] = val + + # this is a hack to make the response look like a dict because of the io.BytesIO object + # otherwise it will throw an exception on printing + def __str__(self) -> str: + my_dict = self.to_dict() + return my_dict.__str__() + + +@dataclass +class OpenResponse(DataClassJsonMixin): + """ + Open Message from the Deepgram Platform + """ + + type: str = "" + + def __getitem__(self, key): + _dict = self.to_dict() + return _dict[key] + + def __setitem__(self, key, val): + self.__dict__[key] = val + + def __str__(self) -> str: + return self.to_json(indent=4) + + +@dataclass +class MetadataResponse(DataClassJsonMixin): + """ + Metadata object + """ + + request_id: str = "" + + def __getitem__(self, key): + _dict = self.to_dict() + return _dict[key] + + def __setitem__(self, key, val): + self.__dict__[key] = val + + def __str__(self) -> str: + return self.to_json(indent=4) + + +@dataclass +class FlushedResponse(DataClassJsonMixin): + """ + Flushed Message from the Deepgram Platform + """ + + type: str = "" + + def __getitem__(self, key): + _dict = self.to_dict() + return _dict[key] + + def __setitem__(self, key, val): + self.__dict__[key] = val + + def __str__(self) -> str: + return self.to_json(indent=4) + + +@dataclass +class CloseResponse(DataClassJsonMixin): + """ + Close Message from the Deepgram Platform + """ + + type: str = "" + + def __getitem__(self, key): + _dict = self.to_dict() + return _dict[key] + + def __setitem__(self, key, val): + self.__dict__[key] = val + + def __str__(self) -> str: + return self.to_json(indent=4) + + +@dataclass +class ErrorResponse(DataClassJsonMixin): + """ + Error Message from the Deepgram Platform + """ + + description: str = "" + message: str = "" + type: str = "" + variant: Optional[str] = "" + + def __getitem__(self, key): + _dict = self.to_dict() + return _dict[key] + + def __setitem__(self, key, val): + self.__dict__[key] = val + + def __str__(self) -> str: + return self.to_json(indent=4) + + +@dataclass +class WarningResponse(DataClassJsonMixin): + """ + Warning Message from the Deepgram Platform + """ + + warn_code: str = "" + warn_msg: str = "" + type: str = "" + + def __getitem__(self, key): + _dict = self.to_dict() + return _dict[key] + + def __setitem__(self, key, val): + self.__dict__[key] = val + + def __str__(self) -> str: + return self.to_json(indent=4) + + +# Unhandled Message + + +@dataclass +class UnhandledResponse(DataClassJsonMixin): + """ + Unhandled Message from the Deepgram Platform + """ + + type: str = "" + raw: str = "" + + def __getitem__(self, key): + _dict = self.to_dict() + return _dict[key] + + def __setitem__(self, key, val): + self.__dict__[key] = val + + def __str__(self) -> str: + return self.to_json(indent=4) diff --git a/deepgram/options.py b/deepgram/options.py index 35ae8489..e92e955c 100644 --- a/deepgram/options.py +++ b/deepgram/options.py @@ -14,7 +14,7 @@ from .errors import DeepgramApiKeyError -class DeepgramClientOptions: +class DeepgramClientOptions: # pylint: disable=too-many-instance-attributes """ Represents options for configuring a Deepgram client. @@ -29,7 +29,8 @@ class DeepgramClientOptions: """ _logger: verboselogs.VerboseLogger - _inspect: bool = False + _inspect_listen: bool = False + _inspect_speak: bool = False def __init__( self, @@ -60,8 +61,10 @@ def __init__( options = {} self.options = options - if self.is_auto_flush_enabled(): - self._inspect = True + if self.is_auto_flush_reply_enabled(): + self._inspect_listen = True + if self.is_auto_flush_speak_enabled(): + self._inspect_speak = True def set_apikey(self, api_key: str): """ @@ -100,18 +103,37 @@ def is_keep_alive_enabled(self) -> bool: "keep_alive", False ) - def is_auto_flush_enabled(self) -> bool: + def is_auto_flush_reply_enabled(self) -> bool: """ - is_auto_flush_enabled: Returns True if the client is configured to auto-flush. + is_auto_flush_reply_enabled: Returns True if the client is configured to auto-flush for listen. """ - auto_flush_delta = float(self.options.get("auto_flush_reply_delta", 0)) - return isinstance(auto_flush_delta, numbers.Number) and auto_flush_delta > 0 + auto_flush_reply_delta = float(self.options.get("auto_flush_reply_delta", 0)) + return ( + isinstance(auto_flush_reply_delta, numbers.Number) + and auto_flush_reply_delta > 0 + ) + + def is_auto_flush_speak_enabled(self) -> bool: + """ + is_auto_flush_speak_enabled: Returns True if the client is configured to auto-flush for speak. + """ + auto_flush_speak_delta = float(self.options.get("auto_flush_speak_delta", 0)) + return ( + isinstance(auto_flush_speak_delta, numbers.Number) + and auto_flush_speak_delta > 0 + ) + + def is_inspecting_listen(self) -> bool: + """ + is_inspecting_listen: Returns True if the client is inspecting listen messages. + """ + return self._inspect_listen - def is_inspecting_messages(self) -> bool: + def is_inspecting_speak(self) -> bool: """ - is_inspecting_messages: Returns True if the client is inspecting messages. + is_inspecting_speak: Returns True if the client is inspecting speak messages. """ - return self._inspect + return self._inspect_speak class ClientOptionsFromEnv( diff --git a/examples/requirements-examples.txt b/examples/requirements-examples.txt index ee220abd..8b772e0a 100644 --- a/examples/requirements-examples.txt +++ b/examples/requirements-examples.txt @@ -4,4 +4,7 @@ python-dotenv # streaming libs -pyaudio \ No newline at end of file +pyaudio +playsound3==2.2.1 +sounddevice==0.4.7 +numpy==2.0.1 diff --git a/examples/speech-to-text/rest/url/main.py b/examples/speech-to-text/rest/url/main.py index 85ebe16d..7831bf94 100644 --- a/examples/speech-to-text/rest/url/main.py +++ b/examples/speech-to-text/rest/url/main.py @@ -22,8 +22,8 @@ def main(): try: - # STEP 1 Create a Deepgram client using the API key from environment variables - deepgram: DeepgramClient = DeepgramClient("", ClientOptionsFromEnv()) + # STEP 1 Create a Deepgram client using the DEEPGRAM_API_KEY from your environment variables + deepgram: DeepgramClient = DeepgramClient() # STEP 2 Call the transcribe_url method on the rest class options: PrerecordedOptions = PrerecordedOptions( diff --git a/examples/text-to-speech/websocket/async_complete/main.py b/examples/text-to-speech/websocket/async_complete/main.py new file mode 100644 index 00000000..50043d91 --- /dev/null +++ b/examples/text-to-speech/websocket/async_complete/main.py @@ -0,0 +1,102 @@ +# Copyright 2024 Deepgram SDK contributors. All Rights Reserved. +# Use of this source code is governed by a MIT license that can be found in the LICENSE file. +# SPDX-License-Identifier: MIT + +import asyncio +import sounddevice as sd +import numpy as np +import time +from deepgram.utils import verboselogs + +from deepgram import ( + DeepgramClient, + DeepgramClientOptions, + SpeakWebSocketEvents, + SpeakOptions, +) + +TTS_TEXT = "Hello, this is a text to speech example using Deepgram." + + +async def main(): + try: + # example of setting up a client config. logging values: WARNING, VERBOSE, DEBUG, SPAM + # config: DeepgramClientOptions = DeepgramClientOptions( + # # options={"auto_flush_speak_delta": "500"}, + # verbose=verboselogs.SPAM, + # ) + # deepgram: DeepgramClient = DeepgramClient("", config) + # otherwise, use default config + deepgram: DeepgramClient = DeepgramClient() + + # Create a websocket connection to Deepgram + dg_connection = deepgram.speak.asyncwebsocket.v("1") + + async def on_open(self, open, **kwargs): + print(f"\n\n{open}\n\n") + + async def on_binary_data(self, data, **kwargs): + print("Received binary data") + array = np.frombuffer(data, dtype=np.int16) + sd.play(array, 48000) + sd.wait() + + async def on_metadata(self, metadata, **kwargs): + print(f"\n\n{metadata}\n\n") + + async def on_flush(self, flush, **kwargs): + print(f"\n\n{flush}\n\n") + + async def on_close(self, close, **kwargs): + print(f"\n\n{close}\n\n") + + async def on_warning(self, warning, **kwargs): + print(f"\n\n{warning}\n\n") + + async def on_error(self, error, **kwargs): + print(f"\n\n{error}\n\n") + + async def on_unhandled(self, unhandled, **kwargs): + print(f"\n\n{unhandled}\n\n") + + dg_connection.on(SpeakWebSocketEvents.Open, on_open) + dg_connection.on(SpeakWebSocketEvents.AudioData, on_binary_data) + dg_connection.on(SpeakWebSocketEvents.Metadata, on_metadata) + dg_connection.on(SpeakWebSocketEvents.Flush, on_flush) + dg_connection.on(SpeakWebSocketEvents.Close, on_close) + dg_connection.on(SpeakWebSocketEvents.Error, on_error) + dg_connection.on(SpeakWebSocketEvents.Warning, on_warning) + dg_connection.on(SpeakWebSocketEvents.Unhandled, on_unhandled) + + # connect to websocket + options = SpeakOptions( + model="aura-asteria-en", + encoding="linear16", + container="none", + sample_rate=48000, + ) + + print("\n\nPress Enter to stop...\n\n") + if await dg_connection.start(options) is False: + print("Failed to start connection") + return + + # send the text to Deepgram + await dg_connection.send_text(TTS_TEXT) + # if auto_flush_speak_delta is not used, you must flush the connection by calling flush() + await dg_connection.flush() + + # Indicate that we've finished + await asyncio.sleep(7) + await dg_connection.finish() + + print("Finished") + + except ValueError as e: + print(f"Invalid value encountered: {e}") + except Exception as e: + print(f"An unexpected error occurred: {e}") + + +if __name__ == "__main__": + asyncio.run(main()) diff --git a/examples/text-to-speech/websocket/complete/main.py b/examples/text-to-speech/websocket/complete/main.py new file mode 100644 index 00000000..589207c7 --- /dev/null +++ b/examples/text-to-speech/websocket/complete/main.py @@ -0,0 +1,104 @@ +# Copyright 2024 Deepgram SDK contributors. All Rights Reserved. +# Use of this source code is governed by a MIT license that can be found in the LICENSE file. +# SPDX-License-Identifier: MIT + +import time +from deepgram.utils import verboselogs + +import sounddevice as sd +import numpy as np + +from deepgram import ( + DeepgramClient, + DeepgramClientOptions, + SpeakWebSocketEvents, + SpeakOptions, +) + +TTS_TEXT = "Hello, this is a text to speech example using Deepgram." + + +def main(): + try: + # example of setting up a client config. logging values: WARNING, VERBOSE, DEBUG, SPAM + # config: DeepgramClientOptions = DeepgramClientOptions( + # # options={"auto_flush_speak_delta": "500"}, + # verbose=verboselogs.SPAM, + # ) + # deepgram: DeepgramClient = DeepgramClient("", config) + # otherwise, use default config + deepgram: DeepgramClient = DeepgramClient() + + # Create a websocket connection to Deepgram + dg_connection = deepgram.speak.websocket.v("1") + + def on_open(self, open, **kwargs): + print(f"\n\n{open}\n\n") + + def on_binary_data(self, data, **kwargs): + print("Received binary data") + array = np.frombuffer(data, dtype=np.int16) + sd.play(array, 48000) + sd.wait() + + def on_metadata(self, metadata, **kwargs): + print(f"\n\n{metadata}\n\n") + + def on_flush(self, flush, **kwargs): + print(f"\n\n{flush}\n\n") + + def on_close(self, close, **kwargs): + print(f"\n\n{close}\n\n") + + def on_warning(self, warning, **kwargs): + print(f"\n\n{warning}\n\n") + + def on_error(self, error, **kwargs): + print(f"\n\n{error}\n\n") + + def on_unhandled(self, unhandled, **kwargs): + print(f"\n\n{unhandled}\n\n") + + dg_connection.on(SpeakWebSocketEvents.Open, on_open) + dg_connection.on(SpeakWebSocketEvents.AudioData, on_binary_data) + dg_connection.on(SpeakWebSocketEvents.Metadata, on_metadata) + dg_connection.on(SpeakWebSocketEvents.Flush, on_flush) + dg_connection.on(SpeakWebSocketEvents.Close, on_close) + dg_connection.on(SpeakWebSocketEvents.Error, on_error) + dg_connection.on(SpeakWebSocketEvents.Warning, on_warning) + dg_connection.on(SpeakWebSocketEvents.Unhandled, on_unhandled) + + # connect to websocket + options = SpeakOptions( + model="aura-asteria-en", + encoding="linear16", + container="none", + sample_rate=48000, + ) + + print("\n\nPress Enter to stop...\n\n") + if dg_connection.start(options) is False: + print("Failed to start connection") + return + + # send the text to Deepgram + dg_connection.send_text(TTS_TEXT) + # if auto_flush_speak_delta is not used, you must flush the connection by calling flush() + dg_connection.flush() + + # Indicate that we've finished + time.sleep(5) + print("\n\nPress Enter to stop...\n\n") + input() + dg_connection.finish() + + print("Finished") + + except ValueError as e: + print(f"Invalid value encountered: {e}") + except Exception as e: + print(f"An unexpected error occurred: {e}") + + +if __name__ == "__main__": + main() diff --git a/examples/text-to-speech/websocket/simple/main-containerized.py b/examples/text-to-speech/websocket/simple/main-containerized.py new file mode 100644 index 00000000..2139cbf2 --- /dev/null +++ b/examples/text-to-speech/websocket/simple/main-containerized.py @@ -0,0 +1,84 @@ +# Copyright 2024 Deepgram SDK contributors. All Rights Reserved. +# Use of this source code is governed by a MIT license that can be found in the LICENSE file. +# SPDX-License-Identifier: MIT + +import time +from deepgram.utils import verboselogs + +from playsound3 import playsound + +from deepgram import ( + DeepgramClient, + DeepgramClientOptions, + SpeakWebSocketEvents, + SpeakOptions, +) + +AUDIO_FILE = "output.wav" +TTS_TEXT = "Hello, this is a text to speech example using Deepgram. How are you doing today? I am fine thanks for asking." + + +def main(): + try: + # example of setting up a client config. logging values: WARNING, VERBOSE, DEBUG, SPAM + # config: DeepgramClientOptions = DeepgramClientOptions( + # # options={"auto_flush_speak_delta": "500"}, + # verbose=verboselogs.SPAM, + # ) + # deepgram: DeepgramClient = DeepgramClient("", config) + # otherwise, use default config + deepgram: DeepgramClient = DeepgramClient() + + # Create a websocket connection to Deepgram + dg_connection = deepgram.speak.websocket.v("1") + + def on_open(self, open, **kwargs): + print(f"\n\n{open}\n\n") + + def on_binary_data(self, data, **kwargs): + print("Received binary data") + with open(AUDIO_FILE, "wb") as f: + f.write(data) + playsound(AUDIO_FILE) + + def on_close(self, close, **kwargs): + print(f"\n\n{close}\n\n") + + dg_connection.on(SpeakWebSocketEvents.Open, on_open) + dg_connection.on(SpeakWebSocketEvents.AudioData, on_binary_data) + dg_connection.on(SpeakWebSocketEvents.Close, on_close) + + # connect to websocket + options = SpeakOptions( + model="aura-asteria-en", + encoding="linear16", + container="wav", + sample_rate=48000, + ) + + print("\n\nPress Enter to stop...\n\n") + if dg_connection.start(options) is False: + print("Failed to start connection") + return + + # send the text to Deepgram + dg_connection.send_text(TTS_TEXT) + # if auto_flush_speak_delta is not used, you must flush the connection by calling flush() + dg_connection.flush() + + # Indicate that we've finished + time.sleep(7) + print("\n\nPress Enter to stop...\n\n") + input() + dg_connection.finish() + + print("Finished") + + except ValueError as e: + print(f"Invalid value encountered: {e}") + except Exception as e: + print(f"An unexpected error occurred: {e}") + + +if __name__ == "__main__": + main() diff --git a/examples/text-to-speech/websocket/simple/main-non-containerized.py b/examples/text-to-speech/websocket/simple/main-non-containerized.py new file mode 100644 index 00000000..08b44817 --- /dev/null +++ b/examples/text-to-speech/websocket/simple/main-non-containerized.py @@ -0,0 +1,84 @@ +# Copyright 2024 Deepgram SDK contributors. All Rights Reserved. +# Use of this source code is governed by a MIT license that can be found in the LICENSE file. +# SPDX-License-Identifier: MIT + +import time +from deepgram.utils import verboselogs + +import sounddevice as sd +import numpy as np + +from deepgram import ( + DeepgramClient, + DeepgramClientOptions, + SpeakWebSocketEvents, + SpeakOptions, +) + +TTS_TEXT = "Hello, this is a text to speech example using Deepgram. How are you doing today? I am fine thanks for asking." + + +def main(): + try: + # example of setting up a client config. logging values: WARNING, VERBOSE, DEBUG, SPAM + # config: DeepgramClientOptions = DeepgramClientOptions( + # # options={"auto_flush_speak_delta": "500"}, + # verbose=verboselogs.SPAM, + # ) + # deepgram: DeepgramClient = DeepgramClient("", config) + # otherwise, use default config + deepgram: DeepgramClient = DeepgramClient() + + # Create a websocket connection to Deepgram + dg_connection = deepgram.speak.websocket.v("1") + + def on_open(self, open, **kwargs): + print(f"\n\n{open}\n\n") + + def on_binary_data(self, data, **kwargs): + print("Received binary data") + array = np.frombuffer(data, dtype=np.int16) + sd.play(array, 48000) + sd.wait() + + def on_close(self, close, **kwargs): + print(f"\n\n{close}\n\n") + + dg_connection.on(SpeakWebSocketEvents.Open, on_open) + dg_connection.on(SpeakWebSocketEvents.AudioData, on_binary_data) + dg_connection.on(SpeakWebSocketEvents.Close, on_close) + + # connect to websocket + options = SpeakOptions( + model="aura-asteria-en", + encoding="linear16", + container="none", + sample_rate=48000, + ) + + print("\n\nPress Enter to stop...\n\n") + if dg_connection.start(options) is False: + print("Failed to start connection") + return + + # send the text to Deepgram + dg_connection.send_text(TTS_TEXT) + # if auto_flush_speak_delta is not used, you must flush the connection by calling flush() + dg_connection.flush() + + # Indicate that we've finished + time.sleep(7) + print("\n\nPress Enter to stop...\n\n") + input() + dg_connection.finish() + + print("Finished") + + except ValueError as e: + print(f"Invalid value encountered: {e}") + except Exception as e: + print(f"An unexpected error occurred: {e}") + + +if __name__ == "__main__": + main()