From afa5d602f35e7c2d6186003f55e57825929ad752 Mon Sep 17 00:00:00 2001 From: David vonThenen <12752197+dvonthenen@users.noreply.github.com> Date: Mon, 15 Jul 2024 10:58:47 -0700 Subject: [PATCH] [HOLD] Reintroduce TTS WS --- deepgram/__init__.py | 32 +- deepgram/client.py | 32 +- deepgram/clients/__init__.py | 32 +- deepgram/clients/speak/__init__.py | 22 +- deepgram/clients/speak/client.py | 44 +- deepgram/clients/speak/v1/__init__.py | 31 +- .../clients/speak/v1/websocket/__init__.py | 17 + .../speak/v1/websocket/async_client.py | 611 ++++++++++++++++++ deepgram/clients/speak/v1/websocket/client.py | 558 ++++++++++++++++ .../clients/speak/v1/websocket/helpers.py | 43 ++ .../clients/speak/v1/websocket/options.py | 7 + .../clients/speak/v1/websocket/response.py | 185 ++++++ examples/requirements-examples.txt | 5 +- .../websocket/async_interactive/main.py | 97 +++ .../websocket/interactive/main.py | 98 +++ .../websocket/simple/main-containerized.py | 81 +++ .../simple/main-non-containerized.py | 78 +++ 17 files changed, 1875 insertions(+), 98 deletions(-) create mode 100644 deepgram/clients/speak/v1/websocket/__init__.py create mode 100644 deepgram/clients/speak/v1/websocket/async_client.py create mode 100644 deepgram/clients/speak/v1/websocket/client.py create mode 100644 deepgram/clients/speak/v1/websocket/helpers.py create mode 100644 deepgram/clients/speak/v1/websocket/options.py create mode 100644 deepgram/clients/speak/v1/websocket/response.py create mode 100644 examples/text-to-speech/websocket/async_interactive/main.py create mode 100644 examples/text-to-speech/websocket/interactive/main.py create mode 100644 examples/text-to-speech/websocket/simple/main-containerized.py create mode 100644 examples/text-to-speech/websocket/simple/main-non-containerized.py diff --git a/deepgram/__init__.py b/deepgram/__init__.py index f0984073..f5971186 100644 --- a/deepgram/__init__.py +++ b/deepgram/__init__.py @@ -96,7 +96,7 @@ from .client import ( SpeakOptions, SpeakRESTOptions, - # SpeakWebSocketOptions, + SpeakWebSocketOptions, # FileSource, SpeakRestSource, SpeakSource, @@ -115,21 +115,21 @@ SpeakRESTResponse, ) -# ## speak WebSocket -# from .client import ( -# SpeakWebSocketClient, -# AsyncSpeakWebSocketClient, -# ) -# from .client import ( -# SpeakWebSocketResponse, -# # OpenResponse, -# # MetadataResponse, -# FlushedResponse, -# # CloseResponse, -# # UnhandledResponse, -# WarningResponse, -# # ErrorResponse, -# ) +## speak WebSocket +from .client import ( + SpeakWebSocketClient, + AsyncSpeakWebSocketClient, +) +from .client import ( + SpeakWebSocketResponse, + # OpenResponse, + # MetadataResponse, + FlushedResponse, + # CloseResponse, + # UnhandledResponse, + WarningResponse, + # ErrorResponse, +) # manage from .client import ManageClient, AsyncManageClient diff --git a/deepgram/client.py b/deepgram/client.py index 05ce0bd7..7f8daf45 100644 --- a/deepgram/client.py +++ b/deepgram/client.py @@ -100,7 +100,7 @@ from .clients import ( SpeakOptions, SpeakRESTOptions, - # SpeakWebSocketOptions, + SpeakWebSocketOptions, # FileSource, SpeakRestSource, SpeakSource, @@ -119,21 +119,21 @@ SpeakRESTResponse, ) -# ## speak WebSocket -# from .clients import ( -# SpeakWebSocketClient, -# AsyncSpeakWebSocketClient, -# ) -# from .clients import ( -# SpeakWebSocketResponse, -# # OpenResponse, -# # MetadataResponse, -# FlushedResponse, -# # CloseResponse, -# # UnhandledResponse, -# WarningResponse, -# # ErrorResponse, -# ) +## speak WebSocket +from .clients import ( + SpeakWebSocketClient, + AsyncSpeakWebSocketClient, +) +from .clients import ( + SpeakWebSocketResponse, + # OpenResponse, + # MetadataResponse, + FlushedResponse, + # CloseResponse, + # UnhandledResponse, + WarningResponse, + # ErrorResponse, +) # manage client classes/input from .clients import ManageClient, AsyncManageClient diff --git a/deepgram/clients/__init__.py b/deepgram/clients/__init__.py index 8aa1661e..4043e57e 100644 --- a/deepgram/clients/__init__.py +++ b/deepgram/clients/__init__.py @@ -105,7 +105,7 @@ from .speak import ( SpeakOptions, SpeakRESTOptions, - # SpeakWebSocketOptions, + SpeakWebSocketOptions, # FileSource, SpeakRestSource, SpeakSource, @@ -125,21 +125,21 @@ SpeakRESTResponse, ) -# ## text-to-speech WebSocket -# from .speak import ( -# SpeakWebSocketClient, -# AsyncSpeakWebSocketClient, -# ) -# from .speak import ( -# SpeakWebSocketResponse, -# # OpenResponse, -# # MetadataResponse, -# FlushedResponse, -# # CloseResponse, -# # UnhandledResponse, -# WarningResponse, -# # ErrorResponse, -# ) +## text-to-speech WebSocket +from .speak import ( + SpeakWebSocketClient, + AsyncSpeakWebSocketClient, +) +from .speak import ( + SpeakWebSocketResponse, + # OpenResponse, + # MetadataResponse, + FlushedResponse, + # CloseResponse, + # UnhandledResponse, + WarningResponse, + # ErrorResponse, +) # manage from .manage import ManageClient, AsyncManageClient diff --git a/deepgram/clients/speak/__init__.py b/deepgram/clients/speak/__init__.py index 07d8f42e..edf4d385 100644 --- a/deepgram/clients/speak/__init__.py +++ b/deepgram/clients/speak/__init__.py @@ -9,13 +9,13 @@ SpeakClient, # backward compat SpeakRESTClient, AsyncSpeakRESTClient, - # SpeakWebSocketClient, - # AsyncSpeakWebSocketClient, + SpeakWebSocketClient, + AsyncSpeakWebSocketClient, ) from .client import ( SpeakOptions, SpeakRESTOptions, - # SpeakWebSocketOptions, + SpeakWebSocketOptions, FileSource, SpeakRestSource, SpeakSource, @@ -23,12 +23,12 @@ from .client import ( SpeakResponse, # backward compat SpeakRESTResponse, - # SpeakWebSocketResponse, - # OpenResponse, - # MetadataResponse, - # FlushedResponse, - # CloseResponse, - # UnhandledResponse, - # WarningResponse, - # ErrorResponse, + SpeakWebSocketResponse, + OpenResponse, + MetadataResponse, + FlushedResponse, + CloseResponse, + UnhandledResponse, + WarningResponse, + ErrorResponse, ) diff --git a/deepgram/clients/speak/client.py b/deepgram/clients/speak/client.py index fc5e902e..e429fba0 100644 --- a/deepgram/clients/speak/client.py +++ b/deepgram/clients/speak/client.py @@ -5,27 +5,27 @@ from .v1 import ( SpeakRESTClient as SpeakRESTClientLatest, AsyncSpeakRESTClient as AsyncSpeakRESTClientLatest, - # SpeakWebSocketClient as SpeakWebSocketClientLatest, - # AsyncSpeakWebSocketClient as AsyncSpeakWebSocketClientLatest, + SpeakWebSocketClient as SpeakWebSocketClientLatest, + AsyncSpeakWebSocketClient as AsyncSpeakWebSocketClientLatest, ) from .v1 import ( SpeakOptions as SpeakOptionsLatest, SpeakRESTOptions as SpeakRESTOptionsLatest, - # SpeakWebSocketOptions as SpeakWebSocketOptionsLatest, + SpeakWebSocketOptions as SpeakWebSocketOptionsLatest, FileSource as FileSourceLatest, SpeakRestSource as SpeakRestSourceLatest, SpeakSource as SpeakSourceLatest, ) from .v1 import ( SpeakRESTResponse as SpeakRESTResponseLatest, - # SpeakWebSocketResponse as SpeakWebSocketResponseLatest, - # OpenResponse as OpenResponseLatest, - # MetadataResponse as MetadataResponseLatest, - # FlushedResponse as FlushedResponseLatest, - # CloseResponse as CloseResponseLatest, - # UnhandledResponse as UnhandledResponseLatest, - # WarningResponse as WarningResponseLatest, - # ErrorResponse as ErrorResponseLatest, + SpeakWebSocketResponse as SpeakWebSocketResponseLatest, + OpenResponse as OpenResponseLatest, + MetadataResponse as MetadataResponseLatest, + FlushedResponse as FlushedResponseLatest, + CloseResponse as CloseResponseLatest, + UnhandledResponse as UnhandledResponseLatest, + WarningResponse as WarningResponseLatest, + ErrorResponse as ErrorResponseLatest, ) # The client.py points to the current supported version in the SDK. @@ -35,21 +35,21 @@ # input SpeakOptions = SpeakOptionsLatest SpeakRESTOptions = SpeakRESTOptionsLatest -# SpeakWebSocketOptions = SpeakWebSocketOptionsLatest +SpeakWebSocketOptions = SpeakWebSocketOptionsLatest SpeakRestSource = SpeakRestSourceLatest FileSource = FileSourceLatest SpeakSource = SpeakSourceLatest # output SpeakRESTResponse = SpeakRESTResponseLatest -# SpeakWebSocketResponse = SpeakWebSocketResponseLatest -# OpenResponse = OpenResponseLatest -# MetadataResponse = MetadataResponseLatest -# FlushedResponse = FlushedResponseLatest -# CloseResponse = CloseResponseLatest -# UnhandledResponse = UnhandledResponseLatest -# WarningResponse = WarningResponseLatest -# ErrorResponse = ErrorResponseLatest +SpeakWebSocketResponse = SpeakWebSocketResponseLatest +OpenResponse = OpenResponseLatest +MetadataResponse = MetadataResponseLatest +FlushedResponse = FlushedResponseLatest +CloseResponse = CloseResponseLatest +UnhandledResponse = UnhandledResponseLatest +WarningResponse = WarningResponseLatest +ErrorResponse = ErrorResponseLatest # backward compatibility @@ -59,5 +59,5 @@ # clients SpeakRESTClient = SpeakRESTClientLatest AsyncSpeakRESTClient = AsyncSpeakRESTClientLatest -# SpeakWebSocketClient = SpeakWebSocketClientLatest -# AsyncSpeakWebSocketClient = AsyncSpeakWebSocketClientLatest +SpeakWebSocketClient = SpeakWebSocketClientLatest +AsyncSpeakWebSocketClient = AsyncSpeakWebSocketClientLatest diff --git a/deepgram/clients/speak/v1/__init__.py b/deepgram/clients/speak/v1/__init__.py index 97e63d7a..b8fa18eb 100644 --- a/deepgram/clients/speak/v1/__init__.py +++ b/deepgram/clients/speak/v1/__init__.py @@ -11,10 +11,9 @@ SpeakSource, FileSource, ) - -# from .websocket import ( -# SpeakWebSocketOptions, -# ) +from .websocket import ( + SpeakWebSocketOptions, +) from ....options import DeepgramClientOptions, ClientOptionsFromEnv # rest @@ -22,15 +21,15 @@ from .rest import SpeakRESTResponse -# # websocket -# from .websocket import SpeakWebSocketClient, AsyncSpeakWebSocketClient -# from .websocket import ( -# SpeakWebSocketResponse, -# OpenResponse, -# MetadataResponse, -# FlushedResponse, -# CloseResponse, -# UnhandledResponse, -# WarningResponse, -# ErrorResponse, -# ) +# websocket +from .websocket import SpeakWebSocketClient, AsyncSpeakWebSocketClient +from .websocket import ( + SpeakWebSocketResponse, + OpenResponse, + MetadataResponse, + FlushedResponse, + CloseResponse, + UnhandledResponse, + WarningResponse, + ErrorResponse, +) diff --git a/deepgram/clients/speak/v1/websocket/__init__.py b/deepgram/clients/speak/v1/websocket/__init__.py new file mode 100644 index 00000000..e6698987 --- /dev/null +++ b/deepgram/clients/speak/v1/websocket/__init__.py @@ -0,0 +1,17 @@ +# Copyright 2024 Deepgram SDK contributors. All Rights Reserved. +# Use of this source code is governed by a MIT license that can be found in the LICENSE file. +# SPDX-License-Identifier: MIT + +from .client import SpeakWebSocketClient +from .async_client import AsyncSpeakWebSocketClient +from .response import ( + SpeakWebSocketResponse, + OpenResponse, + MetadataResponse, + FlushedResponse, + CloseResponse, + UnhandledResponse, + WarningResponse, + ErrorResponse, +) +from .options import SpeakWebSocketOptions diff --git a/deepgram/clients/speak/v1/websocket/async_client.py b/deepgram/clients/speak/v1/websocket/async_client.py new file mode 100644 index 00000000..7baea70b --- /dev/null +++ b/deepgram/clients/speak/v1/websocket/async_client.py @@ -0,0 +1,611 @@ +# Copyright 2024 Deepgram SDK contributors. All Rights Reserved. +# Use of this source code is governed by a MIT license that can be found in the LICENSE file. +# SPDX-License-Identifier: MIT +import asyncio +import json +import logging +from typing import Dict, Union, Optional, cast, Any +import threading + +import websockets +from websockets.client import WebSocketClientProtocol + +from .....utils import verboselogs +from .....options import DeepgramClientOptions +from ...enums import SpeakWebSocketEvents +from .helpers import convert_to_websocket_url, append_query_params +from ...errors import DeepgramError + +from .response import ( + OpenResponse, + MetadataResponse, + FlushedResponse, + CloseResponse, + WarningResponse, + ErrorResponse, + UnhandledResponse, +) +from .options import SpeakWebSocketOptions + + +class AsyncSpeakWebSocketClient: # pylint: disable=too-many-instance-attributes + """ + Client for interacting with Deepgram's text-to-speech services over WebSockets. + + This class provides methods to establish a WebSocket connection for TTS synthesis and handle real-time TTS synthesis events. + + Args: + config (DeepgramClientOptions): all the options for the client. + """ + + _logger: verboselogs.VerboseLogger + _config: DeepgramClientOptions + _endpoint: str + _websocket_url: str + + _socket: WebSocketClientProtocol + _event_handlers: Dict[SpeakWebSocketEvents, list] + + _listen_thread: Union[asyncio.Task, None] + + _kwargs: Optional[Dict] = None + _addons: Optional[Dict] = None + _options: Optional[Dict] = None + _headers: Optional[Dict] = None + + def __init__(self, config: DeepgramClientOptions): + if config is None: + raise DeepgramError("Config are required") + + self._logger = verboselogs.VerboseLogger(__name__) + self._logger.addHandler(logging.StreamHandler()) + self._logger.setLevel(config.verbose) + + self._config = config + self._endpoint = "v1/speak" + + self._listen_thread = None + + # exit + self._exit_event = asyncio.Event() + + self._event_handlers = { + event: [] for event in SpeakWebSocketEvents.__members__.values() + } + self._websocket_url = convert_to_websocket_url(self._config.url, self._endpoint) + + # pylint: disable=too-many-branches,too-many-statements + async def start( + self, + options: Optional[Union[SpeakWebSocketOptions, Dict]] = None, + addons: Optional[Dict] = None, + headers: Optional[Dict] = None, + members: Optional[Dict] = None, + **kwargs, + ) -> bool: + """ + Starts the WebSocket connection for text-to-speech synthesis. + """ + self._logger.debug("AsyncSpeakStreamClient.start ENTER") + self._logger.info("options: %s", options) + self._logger.info("addons: %s", addons) + self._logger.info("headers: %s", headers) + self._logger.info("members: %s", members) + self._logger.info("kwargs: %s", kwargs) + + if isinstance(options, SpeakWebSocketOptions) and not options.check(): + self._logger.error("options.check failed") + self._logger.debug("SpeakStreamClient.start LEAVE") + raise DeepgramError("Fatal text-to-speech options error") + + self._addons = addons + self._headers = headers + + # add "members" as members of the class + if members is not None: + self.__dict__.update(members) + + # set kwargs as members of the class + if kwargs is not None: + self._kwargs = kwargs + else: + self._kwargs = {} + + if isinstance(options, SpeakWebSocketOptions): + self._logger.info("SpeakWebSocketOptions switching class -> dict") + self._options = options.to_dict() + elif options is not None: + self._options = options + else: + self._options = {} + + combined_options = self._options + if self._addons is not None: + self._logger.info("merging addons to options") + combined_options.update(self._addons) + self._logger.info("new options: %s", combined_options) + self._logger.debug("combined_options: %s", combined_options) + + combined_headers = self._config.headers + if self._headers is not None: + self._logger.info("merging headers to options") + combined_headers.update(self._headers) + self._logger.info("new headers: %s", combined_headers) + self._logger.debug("combined_headers: %s", combined_headers) + + url_with_params = append_query_params(self._websocket_url, combined_options) + + try: + self._socket = await websockets.connect( + url_with_params, extra_headers=combined_headers + ) + self._exit_event.clear() + + # debug the threads + for thread in threading.enumerate(): + self._logger.debug("after running thread: %s", thread.name) + self._logger.debug("number of active threads: %s", threading.active_count()) + + # listen thread + self._listen_thread = asyncio.create_task(self._listening()) + + # debug the threads + for thread in threading.enumerate(): + self._logger.debug("after running thread: %s", thread.name) + self._logger.debug("number of active threads: %s", threading.active_count()) + + # push open event + await self._emit( + SpeakWebSocketEvents(SpeakWebSocketEvents.Open), + OpenResponse(type=SpeakWebSocketEvents.Open), + ) + + self._logger.notice("start succeeded") + self._logger.debug("AsyncSpeakStreamClient.start LEAVE") + return True + except websockets.ConnectionClosed as e: + self._logger.error( + "ConnectionClosed in AsyncSpeakStreamClient.start: %s", e + ) + self._logger.debug("AsyncSpeakStreamClient.start LEAVE") + if self._config.options.get("termination_exception_connect") == "true": + raise + return False + except websockets.exceptions.WebSocketException as e: + self._logger.error( + "WebSocketException in AsyncSpeakStreamClient.start: %s", e + ) + self._logger.debug("AsyncSpeakStreamClient.start LEAVE") + if self._config.options.get("termination_exception_connect") == "true": + raise + return False + except Exception as e: # pylint: disable=broad-except + self._logger.error( + "WebSocketException in AsyncSpeakStreamClient.start: %s", e + ) + self._logger.debug("AsyncSpeakStreamClient.start LEAVE") + if self._config.options.get("termination_exception_connect") == "true": + raise + return False + + # pylint: enable=too-many-branches,too-many-statements + + def on(self, event: SpeakWebSocketEvents, handler) -> None: + """ + Registers event handlers for specific events. + """ + self._logger.info("event subscribed: %s", event) + if event in SpeakWebSocketEvents.__members__.values() and callable(handler): + if handler not in self._event_handlers[event]: + self._event_handlers[event].append(handler) + + # triggers the registered event handlers for a specific event + async def _emit(self, event: SpeakWebSocketEvents, *args, **kwargs) -> None: + """ + Emits events to the registered event handlers. + """ + self._logger.debug("AsyncSpeakStreamClient._emit ENTER") + self._logger.debug("callback handlers for: %s", event) + + # debug the threads + for thread in threading.enumerate(): + self._logger.debug("after running thread: %s", thread.name) + self._logger.debug("number of active threads: %s", threading.active_count()) + + tasks = [] + for handler in self._event_handlers[event]: + task = asyncio.create_task(handler(self, *args, **kwargs)) + tasks.append(task) + + if tasks: + self._logger.debug("waiting for tasks to finish...") + await asyncio.gather(*tasks, return_exceptions=True) + tasks.clear() + + # debug the threads + for thread in threading.enumerate(): + self._logger.debug("after running thread: %s", thread.name) + self._logger.debug("number of active threads: %s", threading.active_count()) + + self._logger.debug("AsyncSpeakStreamClient._emit LEAVE") + + # pylint: disable=too-many-return-statements,too-many-statements,too-many-locals,too-many-branches + async def _listening(self) -> None: + """ + Listens for messages from the WebSocket connection. + """ + self._logger.debug("AsyncSpeakStreamClient._listening ENTER") + + while True: + try: + if self._exit_event.is_set(): + self._logger.notice("_listening exiting gracefully") + self._logger.debug("AsyncSpeakStreamClient._listening LEAVE") + return + + if self._socket is None: + self._logger.warning("socket is empty") + self._logger.debug("AsyncSpeakStreamClient._listening LEAVE") + return + + message = await self._socket.recv() + + if message is None: + self._logger.spam("message is None") + continue + + if isinstance(message, bytes): + self._logger.debug("Binary data received") + await self._emit( + SpeakWebSocketEvents(SpeakWebSocketEvents.AudioData), + data=message, + **dict(cast(Dict[Any, Any], self._kwargs)), + ) + else: + data = json.loads(message) + response_type = data.get("type") + self._logger.debug( + "response_type: %s, data: %s", response_type, data + ) + + match response_type: + case SpeakWebSocketEvents.Open: + open_result: OpenResponse = OpenResponse.from_json(message) + self._logger.verbose("OpenResponse: %s", open_result) + await self._emit( + SpeakWebSocketEvents(SpeakWebSocketEvents.Open), + open=open_result, + **dict(cast(Dict[Any, Any], self._kwargs)), + ) + case SpeakWebSocketEvents.Metadata: + meta_result: MetadataResponse = MetadataResponse.from_json( + message + ) + self._logger.verbose("MetadataResponse: %s", meta_result) + await self._emit( + SpeakWebSocketEvents(SpeakWebSocketEvents.Metadata), + metadata=meta_result, + **dict(cast(Dict[Any, Any], self._kwargs)), + ) + case SpeakWebSocketEvents.Flush: + fl_result: FlushedResponse = FlushedResponse.from_json( + message + ) + self._logger.verbose("FlushedResponse: %s", fl_result) + await self._emit( + SpeakWebSocketEvents(SpeakWebSocketEvents.Flush), + flushed=fl_result, + **dict(cast(Dict[Any, Any], self._kwargs)), + ) + case SpeakWebSocketEvents.Close: + close_result: CloseResponse = CloseResponse.from_json( + message + ) + self._logger.verbose("CloseResponse: %s", close_result) + await self._emit( + SpeakWebSocketEvents(SpeakWebSocketEvents.Close), + close=close_result, + **dict(cast(Dict[Any, Any], self._kwargs)), + ) + case SpeakWebSocketEvents.Warning: + war_warning: WarningResponse = WarningResponse.from_json( + message + ) + self._logger.verbose("WarningResponse: %s", war_warning) + await self._emit( + SpeakWebSocketEvents(SpeakWebSocketEvents.Warning), + warning=war_warning, + **dict(cast(Dict[Any, Any], self._kwargs)), + ) + case SpeakWebSocketEvents.Error: + err_error: ErrorResponse = ErrorResponse.from_json(message) + self._logger.verbose("ErrorResponse: %s", err_error) + await self._emit( + SpeakWebSocketEvents(SpeakWebSocketEvents.Error), + error=err_error, + **dict(cast(Dict[Any, Any], self._kwargs)), + ) + case _: + self._logger.warning( + "Unknown Message: response_type: %s, data: %s", + response_type, + data, + ) + unhandled_error: UnhandledResponse = UnhandledResponse( + type=SpeakWebSocketEvents( + SpeakWebSocketEvents.Unhandled + ), + raw=message, + ) + await self._emit( + SpeakWebSocketEvents(SpeakWebSocketEvents.Unhandled), + unhandled=unhandled_error, + **dict(cast(Dict[Any, Any], self._kwargs)), + ) + + except websockets.exceptions.ConnectionClosedOK as e: + self._logger.notice(f"_listening({e.code}) exiting gracefully") + self._logger.debug("AsyncSpeakStreamClient._listening LEAVE") + return + + except websockets.exceptions.ConnectionClosed as e: + if e.code == 1000: + self._logger.notice(f"_listening({e.code}) exiting gracefully") + self._logger.debug("AsyncSpeakStreamClient._listening LEAVE") + return + + self._logger.error( + "ConnectionClosed in AsyncSpeakStreamClient._listening with code %s: %s", + e.code, + e.reason, + ) + cc_error: ErrorResponse = ErrorResponse( + "ConnectionClosed in AsyncSpeakStreamClient._listening", + f"{e}", + "ConnectionClosed", + ) + await self._emit( + SpeakWebSocketEvents(SpeakWebSocketEvents.Error), + error=cc_error, + **dict(cast(Dict[Any, Any], self._kwargs)), + ) + + # signal exit and close + await self._signal_exit() + + self._logger.debug("AsyncSpeakStreamClient._listening LEAVE") + + if self._config.options.get("termination_exception") == "true": + raise + return + + except websockets.exceptions.WebSocketException as e: + self._logger.error( + "WebSocketException in AsyncSpeakStreamClient._listening: %s", e + ) + ws_error: ErrorResponse = ErrorResponse( + "WebSocketException in AsyncSpeakStreamClient._listening", + f"{e}", + "WebSocketException", + ) + await self._emit( + SpeakWebSocketEvents(SpeakWebSocketEvents.Error), + error=ws_error, + **dict(cast(Dict[Any, Any], self._kwargs)), + ) + + # signal exit and close + await self._signal_exit() + + self._logger.debug("AsyncSpeakStreamClient._listening LEAVE") + + if self._config.options.get("termination_exception") == "true": + raise + return + + except Exception as e: # pylint: disable=broad-except + self._logger.error( + "Exception in AsyncSpeakStreamClient._listening: %s", e + ) + e_error: ErrorResponse = ErrorResponse( + "Exception in AsyncSpeakStreamClient._listening", + f"{e}", + "Exception", + ) + await self._emit( + SpeakWebSocketEvents(SpeakWebSocketEvents.Error), + error=e_error, + **dict(cast(Dict[Any, Any], self._kwargs)), + ) + + # signal exit and close + await self._signal_exit() + + self._logger.debug("AsyncSpeakStreamClient._listening LEAVE") + + if self._config.options.get("termination_exception") == "true": + raise + return + + # pylint: disable=too-many-return-statements + + async def send(self, text_input: str) -> bool: + """ + Sends data over the WebSocket connection. + """ + self._logger.spam("AsyncSpeakStreamClient.send ENTER") + + if self._exit_event.is_set(): + self._logger.notice("send exiting gracefully") + self._logger.debug("AsyncSpeakStreamClient.send LEAVE") + return False + + if self._socket is not None: + try: + await self._socket.send( + json.dumps({"type": "Speak", "text": text_input}) + ) + except websockets.exceptions.ConnectionClosedOK as e: + self._logger.notice(f"send() exiting gracefully: {e.code}") + self._logger.debug("AsyncSpeakStreamClient.send LEAVE") + if self._config.options.get("termination_exception_send") == "true": + raise + return True + except websockets.exceptions.ConnectionClosed as e: + if e.code in [1000, 1001]: + self._logger.notice(f"send({e.code}) exiting gracefully") + self._logger.debug("AsyncSpeakStreamClient.send LEAVE") + if self._config.options.get("termination_exception_send") == "true": + raise + return True + + self._logger.error("send() failed - ConnectionClosed: %s", str(e)) + self._logger.spam("AsyncSpeakStreamClient.send LEAVE") + if self._config.options.get("termination_exception_send") == "true": + raise + return False + except websockets.exceptions.WebSocketException as e: + self._logger.error("send() failed - WebSocketException: %s", str(e)) + self._logger.spam("AsyncSpeakStreamClient.send LEAVE") + if self._config.options.get("termination_exception_send") == "true": + raise + return False + except Exception as e: # pylint: disable=broad-except + self._logger.error("send() failed - Exception: %s", str(e)) + self._logger.spam("AsyncSpeakStreamClient.send LEAVE") + if self._config.options.get("termination_exception_send") == "true": + raise + return False + + self._logger.spam("send() succeeded") + self._logger.spam("AsyncSpeakStreamClient.send LEAVE") + return True + + self._logger.spam("send() failed. socket is None") + self._logger.spam("AsyncSpeakStreamClient.send LEAVE") + return False + + # pylint: enable=too-many-return-statements + + async def flush(self) -> bool: + """ + Flushes the current buffer and returns generated audio + """ + self._logger.spam("AsyncSpeakStreamClient.flush ENTER") + + if self._exit_event.is_set(): + self._logger.notice("flush exiting gracefully") + self._logger.debug("AsyncSpeakStreamClient.flush LEAVE") + return False + + if self._socket is None: + self._logger.notice("socket is not intialized") + self._logger.debug("AsyncSpeakStreamClient.flush LEAVE") + return False + + self._logger.notice("Sending Flush...") + ret = self.send(json.dumps({"type": "Flush"})) + + if not ret: + self._logger.error("flush failed") + self._logger.spam("AsyncSpeakStreamClient.flush LEAVE") + return False + + self._logger.notice("flush succeeded") + self._logger.spam("AsyncSpeakStreamClient.flush LEAVE") + + return True + + async def finish(self) -> bool: + """ + Closes the WebSocket connection gracefully. + """ + self._logger.debug("AsyncSpeakStreamClient.finish ENTER") + + # signal exit + await self._signal_exit() + + # stop the threads + self._logger.verbose("cancelling tasks...") + try: + # Before cancelling, check if the tasks were created + # debug the threads + for thread in threading.enumerate(): + self._logger.debug("before running thread: %s", thread.name) + self._logger.debug("number of active threads: %s", threading.active_count()) + + tasks = [] + + if self._listen_thread is not None: + self._listen_thread.cancel() + tasks.append(self._listen_thread) + self._logger.notice("processing _listen_thread cancel...") + + # Use asyncio.gather to wait for tasks to be cancelled + await asyncio.wait_for( + asyncio.gather(*tasks), timeout=10 + ) # Prevent indefinite waiting + self._logger.notice("threads joined") + + # debug the threads + for thread in threading.enumerate(): + self._logger.debug("after running thread: %s", thread.name) + self._logger.debug("number of active threads: %s", threading.active_count()) + + self._logger.notice("finish succeeded") + self._logger.spam("AsyncSpeakStreamClient.finish LEAVE") + return True + + except asyncio.CancelledError as e: + self._logger.error("tasks cancelled error: %s", e) + self._logger.debug("AsyncSpeakStreamClient.finish LEAVE") + return False + + except asyncio.TimeoutError as e: + self._logger.error("tasks cancellation timed out: %s", e) + self._logger.debug("AsyncSpeakStreamClient.finish LEAVE") + return False + + async def _signal_exit(self) -> None: + # send close event + self._logger.verbose("closing socket...") + if self._socket is not None: + self._logger.verbose("send CloseStream...") + try: + # if the socket connection is closed, the following line might throw an error + await self.send(json.dumps({"type": "CloseStream"})) + except websockets.exceptions.ConnectionClosedOK as e: + self._logger.notice("_signal_exit - ConnectionClosedOK: %s", e.code) + except websockets.exceptions.ConnectionClosed as e: + self._logger.notice("_signal_exit - ConnectionClosed: %s", e.code) + except websockets.exceptions.WebSocketException as e: + self._logger.error("_signal_exit - WebSocketException: %s", str(e)) + except Exception as e: # pylint: disable=broad-except + self._logger.error("_signal_exit - Exception: %s", str(e)) + + # push close event + try: + await self._emit( + SpeakWebSocketEvents(SpeakWebSocketEvents.Close), + close=CloseResponse(type=SpeakWebSocketEvents.Close), + **dict(cast(Dict[Any, Any], self._kwargs)), + ) + except Exception as e: # pylint: disable=broad-except + self._logger.error("_emit - Exception: %s", e) + + # wait for task to send + await asyncio.sleep(0.5) + + # signal exit + self._exit_event.set() + + # closes the WebSocket connection gracefully + self._logger.verbose("clean up socket...") + if self._socket is not None: + self._logger.verbose("socket.wait_closed...") + try: + await self._socket.close() + except websockets.exceptions.WebSocketException as e: + self._logger.error("socket.wait_closed failed: %s", e) + + self._socket = None # type: ignore diff --git a/deepgram/clients/speak/v1/websocket/client.py b/deepgram/clients/speak/v1/websocket/client.py new file mode 100644 index 00000000..831e04eb --- /dev/null +++ b/deepgram/clients/speak/v1/websocket/client.py @@ -0,0 +1,558 @@ +# Copyright 2024 Deepgram SDK contributors. All Rights Reserved. +# Use of this source code is governed by a MIT license that can be found in the LICENSE file. +# SPDX-License-Identifier: MIT + +import json +import time +import logging +from typing import Dict, Union, Optional, cast, Any +import threading + +from websockets.sync.client import connect, ClientConnection +import websockets + +from .....utils import verboselogs +from .....options import DeepgramClientOptions +from ...enums import SpeakWebSocketEvents +from .helpers import convert_to_websocket_url, append_query_params +from ...errors import DeepgramError + +from .response import ( + OpenResponse, + MetadataResponse, + FlushedResponse, + CloseResponse, + WarningResponse, + ErrorResponse, + UnhandledResponse, +) +from .options import SpeakWebSocketOptions + + +class SpeakWebSocketClient: # pylint: disable=too-many-instance-attributes + """ + Client for interacting with Deepgram's text-to-speech services over WebSockets. + + This class provides methods to establish a WebSocket connection for TTS synthesis and handle real-time TTS synthesis events. + + Args: + config (DeepgramClientOptions): all the options for the client. + """ + + _logger: verboselogs.VerboseLogger + _config: DeepgramClientOptions + _endpoint: str + _websocket_url: str + + _socket: ClientConnection + _exit_event: threading.Event + _lock_send: threading.Lock + _event_handlers: Dict[SpeakWebSocketEvents, list] + + _listen_thread: Union[threading.Thread, None] + + _kwargs: Optional[Dict] = None + _addons: Optional[Dict] = None + _options: Optional[Dict] = None + _headers: Optional[Dict] = None + + def __init__(self, config: DeepgramClientOptions): + if config is None: + raise DeepgramError("Config are required") + + self._logger = verboselogs.VerboseLogger(__name__) + self._logger.addHandler(logging.StreamHandler()) + self._logger.setLevel(config.verbose) + + self._config = config + self._endpoint = "v1/speak" + self._lock_send = threading.Lock() + + self._listen_thread = None + + # exit + self._exit_event = threading.Event() + + self._event_handlers = { + event: [] for event in SpeakWebSocketEvents.__members__.values() + } + self._websocket_url = convert_to_websocket_url(self._config.url, self._endpoint) + + # pylint: disable=too-many-statements,too-many-branches + def start( + self, + options: Optional[Union[SpeakWebSocketOptions, Dict]] = None, + addons: Optional[Dict] = None, + headers: Optional[Dict] = None, + members: Optional[Dict] = None, + **kwargs, + ) -> bool: + """ + Starts the WebSocket connection for text-to-speech synthesis. + """ + self._logger.debug("SpeakStreamClient.start ENTER") + self._logger.info("options: %s", options) + self._logger.info("addons: %s", addons) + self._logger.info("headers: %s", headers) + self._logger.info("members: %s", members) + self._logger.info("kwargs: %s", kwargs) + + if isinstance(options, SpeakWebSocketOptions) and not options.check(): + self._logger.error("options.check failed") + self._logger.debug("SpeakStreamClient.start LEAVE") + raise DeepgramError("Fatal text-to-speech options error") + + self._addons = addons + self._headers = headers + + # add "members" as members of the class + if members is not None: + self.__dict__.update(members) + + # set kwargs as members of the class + if kwargs is not None: + self._kwargs = kwargs + else: + self._kwargs = {} + + if isinstance(options, SpeakWebSocketOptions): + self._logger.info("SpeakWebSocketOptions switching class -> dict") + self._options = options.to_dict() + elif options is not None: + self._options = options + else: + self._options = {} + + combined_options = self._options + if self._addons is not None: + self._logger.info("merging addons to options") + combined_options.update(self._addons) + self._logger.info("new options: %s", combined_options) + self._logger.debug("combined_options: %s", combined_options) + + combined_headers = self._config.headers + if self._headers is not None: + self._logger.info("merging headers to options") + combined_headers.update(self._headers) + self._logger.info("new headers: %s", combined_headers) + self._logger.debug("combined_headers: %s", combined_headers) + + url_with_params = append_query_params(self._websocket_url, combined_options) + try: + self._socket = connect(url_with_params, additional_headers=combined_headers) + self._exit_event.clear() + + # debug the threads + for thread in threading.enumerate(): + self._logger.debug("after running thread: %s", thread.name) + self._logger.debug("number of active threads: %s", threading.active_count()) + + # listening thread + self._listen_thread = threading.Thread(target=self._listening) + self._listen_thread.start() + + # debug the threads + for thread in threading.enumerate(): + self._logger.debug("after running thread: %s", thread.name) + self._logger.debug("number of active threads: %s", threading.active_count()) + + # push open event + self._emit( + SpeakWebSocketEvents(SpeakWebSocketEvents.Open), + OpenResponse(type=SpeakWebSocketEvents.Open), + ) + + self._logger.notice("start succeeded") + self._logger.debug("SpeakStreamClient.start LEAVE") + return True + except websockets.ConnectionClosed as e: + self._logger.error("ConnectionClosed in SpeakStreamClient.start: %s", e) + self._logger.debug("SpeakStreamClient.start LEAVE") + if self._config.options.get("termination_exception_connect") == "true": + raise e + return False + except websockets.exceptions.WebSocketException as e: + self._logger.error("WebSocketException in SpeakStreamClient.start: %s", e) + self._logger.debug("SpeakStreamClient.start LEAVE") + if self._config.options.get("termination_exception_connect") == "true": + raise e + return False + except Exception as e: # pylint: disable=broad-except + self._logger.error("WebSocketException in SpeakStreamClient.start: %s", e) + self._logger.debug("SpeakStreamClient.start LEAVE") + if self._config.options.get("termination_exception_connect") == "true": + raise e + return False + + # pylint: enable=too-many-statements,too-many-branches + + def on( + self, event: SpeakWebSocketEvents, handler + ) -> None: # registers event handlers for specific events + """ + Registers event handlers for specific events. + """ + self._logger.info("event subscribed: %s", event) + if event in SpeakWebSocketEvents.__members__.values() and callable(handler): + self._event_handlers[event].append(handler) + + def _emit(self, event: SpeakWebSocketEvents, *args, **kwargs) -> None: + """ + Emits events to the registered event handlers. + """ + self._logger.debug("callback handlers for: %s", event) + for handler in self._event_handlers[event]: + handler(self, *args, **kwargs) + + # pylint: disable=too-many-return-statements,too-many-statements,too-many-locals,too-many-branches + def _listening( + self, + ) -> None: + """ + Listens for messages from the WebSocket connection. + """ + self._logger.debug("SpeakStreamClient._listening ENTER") + + while True: + try: + if self._exit_event.is_set(): + self._logger.notice("_listening exiting gracefully") + self._logger.debug("SpeakStreamClient._listening LEAVE") + return + + if self._socket is None: + self._logger.warning("socket is empty") + self._logger.debug("SpeakStreamClient._listening LEAVE") + return + + message = self._socket.recv() + + if message is None: + self._logger.info("message is empty") + continue + + if isinstance(message, bytes): + self._logger.debug("Binary data received") + self._emit( + SpeakWebSocketEvents(SpeakWebSocketEvents.AudioData), + data=message, + **dict(cast(Dict[Any, Any], self._kwargs)), + ) + else: + data = json.loads(message) + response_type = data.get("type") + self._logger.debug( + "response_type: %s, data: %s", response_type, data + ) + + match response_type: + case SpeakWebSocketEvents.Open: + open_result: OpenResponse = OpenResponse.from_json(message) + self._logger.verbose("OpenResponse: %s", open_result) + self._emit( + SpeakWebSocketEvents(SpeakWebSocketEvents.Open), + open=open_result, + **dict(cast(Dict[Any, Any], self._kwargs)), + ) + case SpeakWebSocketEvents.Metadata: + meta_result: MetadataResponse = MetadataResponse.from_json( + message + ) + self._logger.verbose("MetadataResponse: %s", meta_result) + self._emit( + SpeakWebSocketEvents(SpeakWebSocketEvents.Metadata), + metadata=meta_result, + **dict(cast(Dict[Any, Any], self._kwargs)), + ) + case SpeakWebSocketEvents.Flush: + fl_result: FlushedResponse = FlushedResponse.from_json( + message + ) + self._logger.verbose("FlushedResponse: %s", fl_result) + self._emit( + SpeakWebSocketEvents(SpeakWebSocketEvents.Flush), + flushed=fl_result, + **dict(cast(Dict[Any, Any], self._kwargs)), + ) + case SpeakWebSocketEvents.Close: + close_result: CloseResponse = CloseResponse.from_json( + message + ) + self._logger.verbose("CloseResponse: %s", close_result) + self._emit( + SpeakWebSocketEvents(SpeakWebSocketEvents.Close), + close=close_result, + **dict(cast(Dict[Any, Any], self._kwargs)), + ) + case SpeakWebSocketEvents.Warning: + war_warning: WarningResponse = WarningResponse.from_json( + message + ) + self._logger.verbose("WarningResponse: %s", war_warning) + self._emit( + SpeakWebSocketEvents(SpeakWebSocketEvents.Warning), + warning=war_warning, + **dict(cast(Dict[Any, Any], self._kwargs)), + ) + case SpeakWebSocketEvents.Error: + err_error: ErrorResponse = ErrorResponse.from_json(message) + self._logger.verbose("ErrorResponse: %s", err_error) + self._emit( + SpeakWebSocketEvents(SpeakWebSocketEvents.Error), + error=err_error, + **dict(cast(Dict[Any, Any], self._kwargs)), + ) + case _: + self._logger.warning( + "Unknown Message: response_type: %s, data: %s", + response_type, + data, + ) + unhandled_error: UnhandledResponse = UnhandledResponse( + type=SpeakWebSocketEvents( + SpeakWebSocketEvents.Unhandled + ), + raw=message, + ) + self._emit( + SpeakWebSocketEvents(SpeakWebSocketEvents.Unhandled), + unhandled=unhandled_error, + **dict(cast(Dict[Any, Any], self._kwargs)), + ) + + except websockets.exceptions.ConnectionClosedOK as e: + self._logger.notice(f"_listening({e.code}) exiting gracefully") + self._logger.debug("SpeakStreamClient._listening LEAVE") + return + + except websockets.exceptions.ConnectionClosed as e: + if e.code == 1000: + self._logger.notice(f"_listening({e.code}) exiting gracefully") + self._logger.debug("SpeakStreamClient._listening LEAVE") + return + + self._logger.error( + "ConnectionClosed in SpeakStreamClient._listening with code %s: %s", + e.code, + e.reason, + ) + cc_error: ErrorResponse = ErrorResponse( + "ConnectionClosed in SpeakStreamClient._listening", + f"{e}", + "ConnectionClosed", + ) + self._emit(SpeakWebSocketEvents(SpeakWebSocketEvents.Error), cc_error) + + # signal exit and close + self._signal_exit() + + self._logger.debug("SpeakStreamClient._listening LEAVE") + + if self._config.options.get("termination_exception") == "true": + raise + return + + except websockets.exceptions.WebSocketException as e: + self._logger.error( + "WebSocketException in SpeakStreamClient._listening with: %s", e + ) + ws_error: ErrorResponse = ErrorResponse( + "WebSocketException in SpeakStreamClient._listening", + f"{e}", + "WebSocketException", + ) + self._emit(SpeakWebSocketEvents(SpeakWebSocketEvents.Error), ws_error) + + # signal exit and close + self._signal_exit() + + self._logger.debug("SpeakStreamClient._listening LEAVE") + + if self._config.options.get("termination_exception") == "true": + raise + return + + except Exception as e: # pylint: disable=broad-except + self._logger.error("Exception in SpeakStreamClient._listening: %s", e) + e_error: ErrorResponse = ErrorResponse( + "Exception in SpeakStreamClient._listening", + f"{e}", + "Exception", + ) + self._logger.error( + "Exception in SpeakStreamClient._listening: %s", str(e) + ) + self._emit(SpeakWebSocketEvents(SpeakWebSocketEvents.Error), e_error) + + # signal exit and close + self._signal_exit() + + self._logger.debug("SpeakStreamClient._listening LEAVE") + + if self._config.options.get("termination_exception") == "true": + raise + return + + # pylint: disable=too-many-return-statements + def send(self, text_input: str) -> bool: + """ + Sends data over the WebSocket connection. + """ + self._logger.spam("SpeakStreamClient.send ENTER") + + if self._exit_event.is_set(): + self._logger.notice("send exiting gracefully") + self._logger.debug("SpeakStreamClient.send LEAVE") + return False + + if self._socket is not None: + with self._lock_send: + try: + self._socket.send(json.dumps({"type": "Speak", "text": text_input})) + except websockets.exceptions.ConnectionClosedOK as e: + self._logger.notice(f"send() exiting gracefully: {e.code}") + self._logger.debug("SpeakStreamClient._keep_alive LEAVE") + if self._config.options.get("termination_exception_send") == "true": + raise + return True + except websockets.exceptions.ConnectionClosed as e: + if e.code == 1000: + self._logger.notice(f"send({e.code}) exiting gracefully") + self._logger.debug("SpeakStreamClient.send LEAVE") + if ( + self._config.options.get("termination_exception_send") + == "true" + ): + raise + return True + self._logger.error("send() failed - ConnectionClosed: %s", str(e)) + self._logger.spam("SpeakStreamClient.send LEAVE") + if self._config.options.get("termination_exception_send") == "true": + raise + return False + except websockets.exceptions.WebSocketException as e: + self._logger.error("send() failed - WebSocketException: %s", str(e)) + self._logger.spam("SpeakStreamClient.send LEAVE") + if self._config.options.get("termination_exception_send") == "true": + raise + return False + except Exception as e: # pylint: disable=broad-except + self._logger.error("send() failed - Exception: %s", str(e)) + self._logger.spam("SpeakStreamClient.send LEAVE") + if self._config.options.get("termination_exception_send") == "true": + raise + return False + + self._logger.spam("send() succeeded") + self._logger.spam("SpeakStreamClient.send LEAVE") + return True + + self._logger.spam("send() failed. socket is None") + self._logger.spam("SpeakStreamClient.send LEAVE") + return False + + # pylint: enable=too-many-return-statements + + def flush(self) -> bool: + """ + Flushes the current buffer and returns generated audio + """ + self._logger.spam("SpeakStreamClient.flush ENTER") + + if self._exit_event.is_set(): + self._logger.notice("flush exiting gracefully") + self._logger.debug("SpeakStreamClient.flush LEAVE") + return False + + if self._socket is None: + self._logger.notice("socket is not intialized") + self._logger.debug("SpeakStreamClient.flush LEAVE") + return False + + self._logger.notice("Sending Flush...") + ret = self.send(json.dumps({"type": "Flush"})) + + if not ret: + self._logger.error("flush failed") + self._logger.spam("SpeakStreamClient.flush LEAVE") + return False + + self._logger.notice("flush succeeded") + self._logger.spam("SpeakStreamClient.flush LEAVE") + + return True + + # closes the WebSocket connection gracefully + def finish(self) -> bool: + """ + Closes the WebSocket connection gracefully. + """ + self._logger.spam("SpeakStreamClient.finish ENTER") + + # debug the threads + for thread in threading.enumerate(): + self._logger.debug("before running thread: %s", thread.name) + self._logger.debug("number of active threads: %s", threading.active_count()) + + # signal exit + self._signal_exit() + + # stop the threads + + if self._listen_thread is not None: + self._listen_thread.join() + self._listen_thread = None + self._logger.notice("listening thread joined") + + # debug the threads + for thread in threading.enumerate(): + self._logger.debug("before running thread: %s", thread.name) + self._logger.debug("number of active threads: %s", threading.active_count()) + + self._logger.notice("finish succeeded") + self._logger.spam("SpeakStreamClient.finish LEAVE") + return True + + # signals the WebSocket connection to exit + def _signal_exit(self) -> None: + # closes the WebSocket connection gracefully + self._logger.notice("closing socket...") + if self._socket is not None: + self._logger.notice("sending Close...") + try: + # if the socket connection is closed, the following line might throw an error + self._socket.send(json.dumps({"type": "Close"})) + except websockets.exceptions.ConnectionClosedOK as e: + self._logger.notice("_signal_exit - ConnectionClosedOK: %s", e.code) + except websockets.exceptions.ConnectionClosed as e: + self._logger.error("_signal_exit - ConnectionClosed: %s", e.code) + except websockets.exceptions.WebSocketException as e: + self._logger.error("_signal_exit - WebSocketException: %s", str(e)) + except Exception as e: # pylint: disable=broad-except + self._logger.error("_signal_exit - Exception: %s", str(e)) + + # push close event + try: + self._emit( + SpeakWebSocketEvents(SpeakWebSocketEvents.Close), + CloseResponse(type=SpeakWebSocketEvents.Close), + ) + except Exception as e: # pylint: disable=broad-except + self._logger.error("_signal_exit - Exception: %s", e) + + # wait for task to send + time.sleep(0.5) + + # signal exit + self._exit_event.set() + + # closes the WebSocket connection gracefully + self._logger.verbose("clean up socket...") + if self._socket is not None: + self._logger.verbose("socket.wait_closed...") + try: + self._socket.close() + except websockets.exceptions.WebSocketException as e: + self._logger.error("socket.wait_closed failed: %s", e) + + self._socket = None # type: ignore diff --git a/deepgram/clients/speak/v1/websocket/helpers.py b/deepgram/clients/speak/v1/websocket/helpers.py new file mode 100644 index 00000000..dffcdb06 --- /dev/null +++ b/deepgram/clients/speak/v1/websocket/helpers.py @@ -0,0 +1,43 @@ +from urllib.parse import urlparse, urlunparse, parse_qs, urlencode +from typing import Dict, Optional +import re + + +# This function appends query parameters to a URL +def append_query_params(url: str, params: Optional[Dict] = None): + """ + Appends query parameters to a URL + """ + parsed_url = urlparse(url) + query_params = parse_qs(parsed_url.query) + + if params is not None: + for key, value in params.items(): + if value is None: + continue + if isinstance(value, bool): + value = str(value).lower() + if isinstance(value, list): + for item in value: + query_params[key] = query_params.get(key, []) + [str(item)] + else: + query_params[key] = [str(value)] + + updated_query_string = urlencode(query_params, doseq=True) + updated_url = parsed_url._replace(query=updated_query_string).geturl() + return updated_url + + +# This function converts a URL to a WebSocket URL +def convert_to_websocket_url(base_url: str, endpoint: str): + """ + Converts a URL to a WebSocket URL + """ + if re.match(r"^https?://", base_url, re.IGNORECASE): + base_url = base_url.replace("https://", "").replace("http://", "") + if not re.match(r"^wss?://", base_url, re.IGNORECASE): + base_url = "wss://" + base_url + parsed_url = urlparse(base_url) + domain = parsed_url.netloc + websocket_url = urlunparse((parsed_url.scheme, domain, endpoint, "", "", "")) + return websocket_url diff --git a/deepgram/clients/speak/v1/websocket/options.py b/deepgram/clients/speak/v1/websocket/options.py new file mode 100644 index 00000000..a5d78752 --- /dev/null +++ b/deepgram/clients/speak/v1/websocket/options.py @@ -0,0 +1,7 @@ +# Copyright 2024 Deepgram SDK contributors. All Rights Reserved. +# Use of this source code is governed by a MIT license that can be found in the LICENSE file. +# SPDX-License-Identifier: MIT + +from ..options import SpeakOptions + +SpeakWebSocketOptions = SpeakOptions diff --git a/deepgram/clients/speak/v1/websocket/response.py b/deepgram/clients/speak/v1/websocket/response.py new file mode 100644 index 00000000..87f1d466 --- /dev/null +++ b/deepgram/clients/speak/v1/websocket/response.py @@ -0,0 +1,185 @@ +# Copyright 2024 Deepgram SDK contributors. All Rights Reserved. +# Use of this source code is governed by a MIT license that can be found in the LICENSE file. +# SPDX-License-Identifier: MIT + + +from typing import Optional +import io + +from dataclasses import dataclass, field +from dataclasses_json import config as dataclass_config, DataClassJsonMixin + +# Speak Response Types: + + +@dataclass +class SpeakWebSocketResponse( + DataClassJsonMixin +): # pylint: disable=too-many-instance-attributes + """ + A class for representing a response from the speak (streaming) endpoint. + """ + + content_type: str = "" + request_id: str = "" + model_uuid: str = "" + model_name: str = "" + date: str = "" + stream: Optional[io.BytesIO] = field( + default=None, metadata=dataclass_config(exclude=lambda f: f is None) + ) + + def __getitem__(self, key): + _dict = self.to_dict() + return _dict[key] + + def __setitem__(self, key, val): + self.__dict__[key] = val + + # this is a hack to make the response look like a dict because of the io.BytesIO object + # otherwise it will throw an exception on printing + def __str__(self) -> str: + my_dict = self.to_dict() + return my_dict.__str__() + + +@dataclass +class OpenResponse(DataClassJsonMixin): + """ + Open Message from the Deepgram Platform + """ + + type: str = "" + + def __getitem__(self, key): + _dict = self.to_dict() + return _dict[key] + + def __setitem__(self, key, val): + self.__dict__[key] = val + + def __str__(self) -> str: + return self.to_json(indent=4) + + +@dataclass +class MetadataResponse(DataClassJsonMixin): + """ + Metadata object + """ + + request_id: str = "" + + def __getitem__(self, key): + _dict = self.to_dict() + return _dict[key] + + def __setitem__(self, key, val): + self.__dict__[key] = val + + def __str__(self) -> str: + return self.to_json(indent=4) + + +@dataclass +class FlushedResponse(DataClassJsonMixin): + """ + Flushed Message from the Deepgram Platform + """ + + type: str = "" + + def __getitem__(self, key): + _dict = self.to_dict() + return _dict[key] + + def __setitem__(self, key, val): + self.__dict__[key] = val + + def __str__(self) -> str: + return self.to_json(indent=4) + + +@dataclass +class CloseResponse(DataClassJsonMixin): + """ + Close Message from the Deepgram Platform + """ + + type: str = "" + + def __getitem__(self, key): + _dict = self.to_dict() + return _dict[key] + + def __setitem__(self, key, val): + self.__dict__[key] = val + + def __str__(self) -> str: + return self.to_json(indent=4) + + +@dataclass +class ErrorResponse(DataClassJsonMixin): + """ + Error Message from the Deepgram Platform + """ + + description: str = "" + message: str = "" + type: str = "" + variant: Optional[str] = "" + + def __getitem__(self, key): + _dict = self.to_dict() + return _dict[key] + + def __setitem__(self, key, val): + self.__dict__[key] = val + + def __str__(self) -> str: + return self.to_json(indent=4) + + +@dataclass +class WarningResponse(DataClassJsonMixin): + """ + Warning Message from the Deepgram Platform + """ + + warn_code: str = "" + warn_msg: str = "" + type: str = "" + + def __getitem__(self, key): + _dict = self.to_dict() + return _dict[key] + + def __setitem__(self, key, val): + self.__dict__[key] = val + + def __str__(self) -> str: + return self.to_json(indent=4) + + +# Unhandled Message + + +@dataclass +class UnhandledResponse(DataClassJsonMixin): + """ + Unhandled Message from the Deepgram Platform + """ + + type: str = "" + raw: str = "" + + def __getitem__(self, key): + _dict = self.to_dict() + return _dict[key] + + def __setitem__(self, key, val): + self.__dict__[key] = val + + def __str__(self) -> str: + return self.to_json(indent=4) diff --git a/examples/requirements-examples.txt b/examples/requirements-examples.txt index ee220abd..8b772e0a 100644 --- a/examples/requirements-examples.txt +++ b/examples/requirements-examples.txt @@ -4,4 +4,7 @@ python-dotenv # streaming libs -pyaudio \ No newline at end of file +pyaudio +playsound3==2.2.1 +sounddevice==0.4.7 +numpy==2.0.1 diff --git a/examples/text-to-speech/websocket/async_interactive/main.py b/examples/text-to-speech/websocket/async_interactive/main.py new file mode 100644 index 00000000..041306db --- /dev/null +++ b/examples/text-to-speech/websocket/async_interactive/main.py @@ -0,0 +1,97 @@ +# Copyright 2024 Deepgram SDK contributors. All Rights Reserved. +# Use of this source code is governed by a MIT license that can be found in the LICENSE file. +# SPDX-License-Identifier: MIT + +import asyncio +import sounddevice as sd +import numpy as np +import time +from deepgram.utils import verboselogs + +from deepgram import ( + DeepgramClient, + DeepgramClientOptions, + SpeakWebSocketEvents, + SpeakOptions, +) + +TTS_TEXT = "Hello, this is a text to speech example using Deepgram." + + +async def main(): + try: + # example of setting up a client config. logging values: WARNING, VERBOSE, DEBUG, SPAM + # config: DeepgramClientOptions = DeepgramClientOptions(verbose=verboselogs.DEBUG) + # deepgram: DeepgramClient = DeepgramClient("", config) + # otherwise, use default config + deepgram: DeepgramClient = DeepgramClient() + + # Create a websocket connection to Deepgram + dg_connection = deepgram.speak.asyncwebsocket.v("1") + + async def on_open(self, open, **kwargs): + print(f"\n\n{open}\n\n") + + async def on_binary_data(self, data, **kwargs): + print("Received binary data") + array = np.frombuffer(data, dtype=np.int16) + sd.play(array, 48000) + sd.wait() + + async def on_metadata(self, metadata, **kwargs): + print(f"\n\n{metadata}\n\n") + + async def on_flush(self, flush, **kwargs): + print(f"\n\n{flush}\n\n") + + async def on_close(self, close, **kwargs): + print(f"\n\n{close}\n\n") + + async def on_warning(self, warning, **kwargs): + print(f"\n\n{warning}\n\n") + + async def on_error(self, error, **kwargs): + print(f"\n\n{error}\n\n") + + async def on_unhandled(self, unhandled, **kwargs): + print(f"\n\n{unhandled}\n\n") + + dg_connection.on(SpeakWebSocketEvents.Open, on_open) + dg_connection.on(SpeakWebSocketEvents.AudioData, on_binary_data) + dg_connection.on(SpeakWebSocketEvents.Metadata, on_metadata) + dg_connection.on(SpeakWebSocketEvents.Flush, on_flush) + dg_connection.on(SpeakWebSocketEvents.Close, on_close) + dg_connection.on(SpeakWebSocketEvents.Error, on_error) + dg_connection.on(SpeakWebSocketEvents.Warning, on_warning) + dg_connection.on(SpeakWebSocketEvents.Unhandled, on_unhandled) + + # connect to websocket + options = SpeakOptions( + model="aura-asteria-en", + encoding="linear16", + container="none", + sample_rate=48000, + ) + + print("\n\nPress Enter to stop...\n\n") + if await dg_connection.start(options) is False: + print("Failed to start connection") + return + + # send the text to Deepgram + await dg_connection.send(TTS_TEXT) + + # Indicate that we've finished + await asyncio.sleep(6) + await dg_connection.finish() + + print("Finished") + + except ValueError as e: + print(f"Invalid value encountered: {e}") + except Exception as e: + print(f"An unexpected error occurred: {e}") + + +if __name__ == "__main__": + asyncio.run(main()) diff --git a/examples/text-to-speech/websocket/interactive/main.py b/examples/text-to-speech/websocket/interactive/main.py new file mode 100644 index 00000000..a69d27c9 --- /dev/null +++ b/examples/text-to-speech/websocket/interactive/main.py @@ -0,0 +1,98 @@ +# Copyright 2024 Deepgram SDK contributors. All Rights Reserved. +# Use of this source code is governed by a MIT license that can be found in the LICENSE file. +# SPDX-License-Identifier: MIT + +import sounddevice as sd +import numpy as np +import time +from deepgram.utils import verboselogs + +from deepgram import ( + DeepgramClient, + DeepgramClientOptions, + SpeakWebSocketEvents, + SpeakOptions, +) + +TTS_TEXT = "Hello, this is a text to speech example using Deepgram." + + +def main(): + try: + # example of setting up a client config. logging values: WARNING, VERBOSE, DEBUG, SPAM + # config: DeepgramClientOptions = DeepgramClientOptions(verbose=verboselogs.DEBUG) + # deepgram: DeepgramClient = DeepgramClient("", config) + # otherwise, use default config + deepgram: DeepgramClient = DeepgramClient() + + # Create a websocket connection to Deepgram + dg_connection = deepgram.speak.websocket.v("1") + + def on_open(self, open, **kwargs): + print(f"\n\n{open}\n\n") + + def on_binary_data(self, data, **kwargs): + print("Received binary data") + array = np.frombuffer(data, dtype=np.int16) + sd.play(array, 48000) + sd.wait() + + def on_metadata(self, metadata, **kwargs): + print(f"\n\n{metadata}\n\n") + + def on_flush(self, flush, **kwargs): + print(f"\n\n{flush}\n\n") + + def on_close(self, close, **kwargs): + print(f"\n\n{close}\n\n") + + def on_warning(self, warning, **kwargs): + print(f"\n\n{warning}\n\n") + + def on_error(self, error, **kwargs): + print(f"\n\n{error}\n\n") + + def on_unhandled(self, unhandled, **kwargs): + print(f"\n\n{unhandled}\n\n") + + dg_connection.on(SpeakWebSocketEvents.Open, on_open) + dg_connection.on(SpeakWebSocketEvents.AudioData, on_binary_data) + dg_connection.on(SpeakWebSocketEvents.Metadata, on_metadata) + dg_connection.on(SpeakWebSocketEvents.Flush, on_flush) + dg_connection.on(SpeakWebSocketEvents.Close, on_close) + dg_connection.on(SpeakWebSocketEvents.Error, on_error) + dg_connection.on(SpeakWebSocketEvents.Warning, on_warning) + dg_connection.on(SpeakWebSocketEvents.Unhandled, on_unhandled) + + # connect to websocket + options = SpeakOptions( + model="aura-asteria-en", + encoding="linear16", + container="none", + sample_rate=48000, + ) + + print("\n\nPress Enter to stop...\n\n") + if dg_connection.start(options) is False: + print("Failed to start connection") + return + + # send the text to Deepgram + dg_connection.send(TTS_TEXT) + + # Indicate that we've finished + time.sleep(5) + print("\n\nPress Enter to stop...\n\n") + input() + dg_connection.finish() + + print("Finished") + + except ValueError as e: + print(f"Invalid value encountered: {e}") + except Exception as e: + print(f"An unexpected error occurred: {e}") + + +if __name__ == "__main__": + main() diff --git a/examples/text-to-speech/websocket/simple/main-containerized.py b/examples/text-to-speech/websocket/simple/main-containerized.py new file mode 100644 index 00000000..d7099cdb --- /dev/null +++ b/examples/text-to-speech/websocket/simple/main-containerized.py @@ -0,0 +1,81 @@ +# Copyright 2024 Deepgram SDK contributors. All Rights Reserved. +# Use of this source code is governed by a MIT license that can be found in the LICENSE file. +# SPDX-License-Identifier: MIT + +import sounddevice as sd +import numpy as np +import time +from deepgram.utils import verboselogs + +from playsound3 import playsound + +from deepgram import ( + DeepgramClient, + DeepgramClientOptions, + SpeakWebSocketEvents, + SpeakOptions, +) + +AUDIO_FILE = "output.wav" +TTS_TEXT = "Hello, this is a text to speech example using Deepgram." + + +def main(): + try: + # example of setting up a client config. logging values: WARNING, VERBOSE, DEBUG, SPAM + # config: DeepgramClientOptions = DeepgramClientOptions(verbose=verboselogs.DEBUG) + # deepgram: DeepgramClient = DeepgramClient("", config) + # otherwise, use default config + deepgram: DeepgramClient = DeepgramClient() + + # Create a websocket connection to Deepgram + dg_connection = deepgram.speak.websocket.v("1") + + def on_open(self, open, **kwargs): + print(f"\n\n{open}\n\n") + + def on_binary_data(self, data, **kwargs): + print("Received binary data") + with open(AUDIO_FILE, "wb") as f: + f.write(data) + playsound(AUDIO_FILE) + + def on_close(self, close, **kwargs): + print(f"\n\n{close}\n\n") + + dg_connection.on(SpeakWebSocketEvents.Open, on_open) + dg_connection.on(SpeakWebSocketEvents.AudioData, on_binary_data) + dg_connection.on(SpeakWebSocketEvents.Close, on_close) + + # connect to websocket + options = SpeakOptions( + model="aura-asteria-en", + encoding="linear16", + container="wav", + sample_rate=48000, + ) + + print("\n\nPress Enter to stop...\n\n") + if dg_connection.start(options) is False: + print("Failed to start connection") + return + + # send the text to Deepgram + dg_connection.send(TTS_TEXT) + + # Indicate that we've finished + time.sleep(5) + print("\n\nPress Enter to stop...\n\n") + input() + dg_connection.finish() + + print("Finished") + + except ValueError as e: + print(f"Invalid value encountered: {e}") + except Exception as e: + print(f"An unexpected error occurred: {e}") + + +if __name__ == "__main__": + main() diff --git a/examples/text-to-speech/websocket/simple/main-non-containerized.py b/examples/text-to-speech/websocket/simple/main-non-containerized.py new file mode 100644 index 00000000..a334a3b0 --- /dev/null +++ b/examples/text-to-speech/websocket/simple/main-non-containerized.py @@ -0,0 +1,78 @@ +# Copyright 2024 Deepgram SDK contributors. All Rights Reserved. +# Use of this source code is governed by a MIT license that can be found in the LICENSE file. +# SPDX-License-Identifier: MIT + +import sounddevice as sd +import numpy as np +import time +from deepgram.utils import verboselogs + +from deepgram import ( + DeepgramClient, + DeepgramClientOptions, + SpeakWebSocketEvents, + SpeakOptions, +) + +TTS_TEXT = "Hello, this is a text to speech example using Deepgram." + + +def main(): + try: + # example of setting up a client config. logging values: WARNING, VERBOSE, DEBUG, SPAM + # config: DeepgramClientOptions = DeepgramClientOptions(verbose=verboselogs.DEBUG) + # deepgram: DeepgramClient = DeepgramClient("", config) + # otherwise, use default config + deepgram: DeepgramClient = DeepgramClient() + + # Create a websocket connection to Deepgram + dg_connection = deepgram.speak.websocket.v("1") + + def on_open(self, open, **kwargs): + print(f"\n\n{open}\n\n") + + def on_binary_data(self, data, **kwargs): + print("Received binary data") + array = np.frombuffer(data, dtype=np.int16) + sd.play(array, 48000) + sd.wait() + + def on_close(self, close, **kwargs): + print(f"\n\n{close}\n\n") + + dg_connection.on(SpeakWebSocketEvents.Open, on_open) + dg_connection.on(SpeakWebSocketEvents.AudioData, on_binary_data) + dg_connection.on(SpeakWebSocketEvents.Close, on_close) + + # connect to websocket + options = SpeakOptions( + model="aura-asteria-en", + encoding="linear16", + container="none", + sample_rate=48000, + ) + + print("\n\nPress Enter to stop...\n\n") + if dg_connection.start(options) is False: + print("Failed to start connection") + return + + # send the text to Deepgram + dg_connection.send(TTS_TEXT) + + # Indicate that we've finished + time.sleep(5) + print("\n\nPress Enter to stop...\n\n") + input() + dg_connection.finish() + + print("Finished") + + except ValueError as e: + print(f"Invalid value encountered: {e}") + except Exception as e: + print(f"An unexpected error occurred: {e}") + + +if __name__ == "__main__": + main()