diff --git a/deepgram/__init__.py b/deepgram/__init__.py index f5971186..f0984073 100644 --- a/deepgram/__init__.py +++ b/deepgram/__init__.py @@ -96,7 +96,7 @@ from .client import ( SpeakOptions, SpeakRESTOptions, - SpeakWebSocketOptions, + # SpeakWebSocketOptions, # FileSource, SpeakRestSource, SpeakSource, @@ -115,21 +115,21 @@ SpeakRESTResponse, ) -## speak WebSocket -from .client import ( - SpeakWebSocketClient, - AsyncSpeakWebSocketClient, -) -from .client import ( - SpeakWebSocketResponse, - # OpenResponse, - # MetadataResponse, - FlushedResponse, - # CloseResponse, - # UnhandledResponse, - WarningResponse, - # ErrorResponse, -) +# ## speak WebSocket +# from .client import ( +# SpeakWebSocketClient, +# AsyncSpeakWebSocketClient, +# ) +# from .client import ( +# SpeakWebSocketResponse, +# # OpenResponse, +# # MetadataResponse, +# FlushedResponse, +# # CloseResponse, +# # UnhandledResponse, +# WarningResponse, +# # ErrorResponse, +# ) # manage from .client import ManageClient, AsyncManageClient diff --git a/deepgram/client.py b/deepgram/client.py index 7f8daf45..05ce0bd7 100644 --- a/deepgram/client.py +++ b/deepgram/client.py @@ -100,7 +100,7 @@ from .clients import ( SpeakOptions, SpeakRESTOptions, - SpeakWebSocketOptions, + # SpeakWebSocketOptions, # FileSource, SpeakRestSource, SpeakSource, @@ -119,21 +119,21 @@ SpeakRESTResponse, ) -## speak WebSocket -from .clients import ( - SpeakWebSocketClient, - AsyncSpeakWebSocketClient, -) -from .clients import ( - SpeakWebSocketResponse, - # OpenResponse, - # MetadataResponse, - FlushedResponse, - # CloseResponse, - # UnhandledResponse, - WarningResponse, - # ErrorResponse, -) +# ## speak WebSocket +# from .clients import ( +# SpeakWebSocketClient, +# AsyncSpeakWebSocketClient, +# ) +# from .clients import ( +# SpeakWebSocketResponse, +# # OpenResponse, +# # MetadataResponse, +# FlushedResponse, +# # CloseResponse, +# # UnhandledResponse, +# WarningResponse, +# # ErrorResponse, +# ) # manage client classes/input from .clients import ManageClient, AsyncManageClient diff --git a/deepgram/clients/__init__.py b/deepgram/clients/__init__.py index 4043e57e..8aa1661e 100644 --- a/deepgram/clients/__init__.py +++ b/deepgram/clients/__init__.py @@ -105,7 +105,7 @@ from .speak import ( SpeakOptions, SpeakRESTOptions, - SpeakWebSocketOptions, + # SpeakWebSocketOptions, # FileSource, SpeakRestSource, SpeakSource, @@ -125,21 +125,21 @@ SpeakRESTResponse, ) -## text-to-speech WebSocket -from .speak import ( - SpeakWebSocketClient, - AsyncSpeakWebSocketClient, -) -from .speak import ( - SpeakWebSocketResponse, - # OpenResponse, - # MetadataResponse, - FlushedResponse, - # CloseResponse, - # UnhandledResponse, - WarningResponse, - # ErrorResponse, -) +# ## text-to-speech WebSocket +# from .speak import ( +# SpeakWebSocketClient, +# AsyncSpeakWebSocketClient, +# ) +# from .speak import ( +# SpeakWebSocketResponse, +# # OpenResponse, +# # MetadataResponse, +# FlushedResponse, +# # CloseResponse, +# # UnhandledResponse, +# WarningResponse, +# # ErrorResponse, +# ) # manage from .manage import ManageClient, AsyncManageClient diff --git a/deepgram/clients/speak/__init__.py b/deepgram/clients/speak/__init__.py index edf4d385..07d8f42e 100644 --- a/deepgram/clients/speak/__init__.py +++ b/deepgram/clients/speak/__init__.py @@ -9,13 +9,13 @@ SpeakClient, # backward compat SpeakRESTClient, AsyncSpeakRESTClient, - SpeakWebSocketClient, - AsyncSpeakWebSocketClient, + # SpeakWebSocketClient, + # AsyncSpeakWebSocketClient, ) from .client import ( SpeakOptions, SpeakRESTOptions, - SpeakWebSocketOptions, + # SpeakWebSocketOptions, FileSource, SpeakRestSource, SpeakSource, @@ -23,12 +23,12 @@ from .client import ( SpeakResponse, # backward compat SpeakRESTResponse, - SpeakWebSocketResponse, - OpenResponse, - MetadataResponse, - FlushedResponse, - CloseResponse, - UnhandledResponse, - WarningResponse, - ErrorResponse, + # SpeakWebSocketResponse, + # OpenResponse, + # MetadataResponse, + # FlushedResponse, + # CloseResponse, + # UnhandledResponse, + # WarningResponse, + # ErrorResponse, ) diff --git a/deepgram/clients/speak/client.py b/deepgram/clients/speak/client.py index e429fba0..fc5e902e 100644 --- a/deepgram/clients/speak/client.py +++ b/deepgram/clients/speak/client.py @@ -5,27 +5,27 @@ from .v1 import ( SpeakRESTClient as SpeakRESTClientLatest, AsyncSpeakRESTClient as AsyncSpeakRESTClientLatest, - SpeakWebSocketClient as SpeakWebSocketClientLatest, - AsyncSpeakWebSocketClient as AsyncSpeakWebSocketClientLatest, + # SpeakWebSocketClient as SpeakWebSocketClientLatest, + # AsyncSpeakWebSocketClient as AsyncSpeakWebSocketClientLatest, ) from .v1 import ( SpeakOptions as SpeakOptionsLatest, SpeakRESTOptions as SpeakRESTOptionsLatest, - SpeakWebSocketOptions as SpeakWebSocketOptionsLatest, + # SpeakWebSocketOptions as SpeakWebSocketOptionsLatest, FileSource as FileSourceLatest, SpeakRestSource as SpeakRestSourceLatest, SpeakSource as SpeakSourceLatest, ) from .v1 import ( SpeakRESTResponse as SpeakRESTResponseLatest, - SpeakWebSocketResponse as SpeakWebSocketResponseLatest, - OpenResponse as OpenResponseLatest, - MetadataResponse as MetadataResponseLatest, - FlushedResponse as FlushedResponseLatest, - CloseResponse as CloseResponseLatest, - UnhandledResponse as UnhandledResponseLatest, - WarningResponse as WarningResponseLatest, - ErrorResponse as ErrorResponseLatest, + # SpeakWebSocketResponse as SpeakWebSocketResponseLatest, + # OpenResponse as OpenResponseLatest, + # MetadataResponse as MetadataResponseLatest, + # FlushedResponse as FlushedResponseLatest, + # CloseResponse as CloseResponseLatest, + # UnhandledResponse as UnhandledResponseLatest, + # WarningResponse as WarningResponseLatest, + # ErrorResponse as ErrorResponseLatest, ) # The client.py points to the current supported version in the SDK. @@ -35,21 +35,21 @@ # input SpeakOptions = SpeakOptionsLatest SpeakRESTOptions = SpeakRESTOptionsLatest -SpeakWebSocketOptions = SpeakWebSocketOptionsLatest +# SpeakWebSocketOptions = SpeakWebSocketOptionsLatest SpeakRestSource = SpeakRestSourceLatest FileSource = FileSourceLatest SpeakSource = SpeakSourceLatest # output SpeakRESTResponse = SpeakRESTResponseLatest -SpeakWebSocketResponse = SpeakWebSocketResponseLatest -OpenResponse = OpenResponseLatest -MetadataResponse = MetadataResponseLatest -FlushedResponse = FlushedResponseLatest -CloseResponse = CloseResponseLatest -UnhandledResponse = UnhandledResponseLatest -WarningResponse = WarningResponseLatest -ErrorResponse = ErrorResponseLatest +# SpeakWebSocketResponse = SpeakWebSocketResponseLatest +# OpenResponse = OpenResponseLatest +# MetadataResponse = MetadataResponseLatest +# FlushedResponse = FlushedResponseLatest +# CloseResponse = CloseResponseLatest +# UnhandledResponse = UnhandledResponseLatest +# WarningResponse = WarningResponseLatest +# ErrorResponse = ErrorResponseLatest # backward compatibility @@ -59,5 +59,5 @@ # clients SpeakRESTClient = SpeakRESTClientLatest AsyncSpeakRESTClient = AsyncSpeakRESTClientLatest -SpeakWebSocketClient = SpeakWebSocketClientLatest -AsyncSpeakWebSocketClient = AsyncSpeakWebSocketClientLatest +# SpeakWebSocketClient = SpeakWebSocketClientLatest +# AsyncSpeakWebSocketClient = AsyncSpeakWebSocketClientLatest diff --git a/deepgram/clients/speak/v1/__init__.py b/deepgram/clients/speak/v1/__init__.py index b8fa18eb..97e63d7a 100644 --- a/deepgram/clients/speak/v1/__init__.py +++ b/deepgram/clients/speak/v1/__init__.py @@ -11,9 +11,10 @@ SpeakSource, FileSource, ) -from .websocket import ( - SpeakWebSocketOptions, -) + +# from .websocket import ( +# SpeakWebSocketOptions, +# ) from ....options import DeepgramClientOptions, ClientOptionsFromEnv # rest @@ -21,15 +22,15 @@ from .rest import SpeakRESTResponse -# websocket -from .websocket import SpeakWebSocketClient, AsyncSpeakWebSocketClient -from .websocket import ( - SpeakWebSocketResponse, - OpenResponse, - MetadataResponse, - FlushedResponse, - CloseResponse, - UnhandledResponse, - WarningResponse, - ErrorResponse, -) +# # websocket +# from .websocket import SpeakWebSocketClient, AsyncSpeakWebSocketClient +# from .websocket import ( +# SpeakWebSocketResponse, +# OpenResponse, +# MetadataResponse, +# FlushedResponse, +# CloseResponse, +# UnhandledResponse, +# WarningResponse, +# ErrorResponse, +# ) diff --git a/deepgram/clients/speak/v1/websocket/__init__.py b/deepgram/clients/speak/v1/websocket/__init__.py deleted file mode 100644 index e6698987..00000000 --- a/deepgram/clients/speak/v1/websocket/__init__.py +++ /dev/null @@ -1,17 +0,0 @@ -# Copyright 2024 Deepgram SDK contributors. All Rights Reserved. -# Use of this source code is governed by a MIT license that can be found in the LICENSE file. -# SPDX-License-Identifier: MIT - -from .client import SpeakWebSocketClient -from .async_client import AsyncSpeakWebSocketClient -from .response import ( - SpeakWebSocketResponse, - OpenResponse, - MetadataResponse, - FlushedResponse, - CloseResponse, - UnhandledResponse, - WarningResponse, - ErrorResponse, -) -from .options import SpeakWebSocketOptions diff --git a/deepgram/clients/speak/v1/websocket/async_client.py b/deepgram/clients/speak/v1/websocket/async_client.py deleted file mode 100644 index 7baea70b..00000000 --- a/deepgram/clients/speak/v1/websocket/async_client.py +++ /dev/null @@ -1,611 +0,0 @@ -# Copyright 2024 Deepgram SDK contributors. All Rights Reserved. -# Use of this source code is governed by a MIT license that can be found in the LICENSE file. -# SPDX-License-Identifier: MIT -import asyncio -import json -import logging -from typing import Dict, Union, Optional, cast, Any -import threading - -import websockets -from websockets.client import WebSocketClientProtocol - -from .....utils import verboselogs -from .....options import DeepgramClientOptions -from ...enums import SpeakWebSocketEvents -from .helpers import convert_to_websocket_url, append_query_params -from ...errors import DeepgramError - -from .response import ( - OpenResponse, - MetadataResponse, - FlushedResponse, - CloseResponse, - WarningResponse, - ErrorResponse, - UnhandledResponse, -) -from .options import SpeakWebSocketOptions - - -class AsyncSpeakWebSocketClient: # pylint: disable=too-many-instance-attributes - """ - Client for interacting with Deepgram's text-to-speech services over WebSockets. - - This class provides methods to establish a WebSocket connection for TTS synthesis and handle real-time TTS synthesis events. - - Args: - config (DeepgramClientOptions): all the options for the client. - """ - - _logger: verboselogs.VerboseLogger - _config: DeepgramClientOptions - _endpoint: str - _websocket_url: str - - _socket: WebSocketClientProtocol - _event_handlers: Dict[SpeakWebSocketEvents, list] - - _listen_thread: Union[asyncio.Task, None] - - _kwargs: Optional[Dict] = None - _addons: Optional[Dict] = None - _options: Optional[Dict] = None - _headers: Optional[Dict] = None - - def __init__(self, config: DeepgramClientOptions): - if config is None: - raise DeepgramError("Config are required") - - self._logger = verboselogs.VerboseLogger(__name__) - self._logger.addHandler(logging.StreamHandler()) - self._logger.setLevel(config.verbose) - - self._config = config - self._endpoint = "v1/speak" - - self._listen_thread = None - - # exit - self._exit_event = asyncio.Event() - - self._event_handlers = { - event: [] for event in SpeakWebSocketEvents.__members__.values() - } - self._websocket_url = convert_to_websocket_url(self._config.url, self._endpoint) - - # pylint: disable=too-many-branches,too-many-statements - async def start( - self, - options: Optional[Union[SpeakWebSocketOptions, Dict]] = None, - addons: Optional[Dict] = None, - headers: Optional[Dict] = None, - members: Optional[Dict] = None, - **kwargs, - ) -> bool: - """ - Starts the WebSocket connection for text-to-speech synthesis. - """ - self._logger.debug("AsyncSpeakStreamClient.start ENTER") - self._logger.info("options: %s", options) - self._logger.info("addons: %s", addons) - self._logger.info("headers: %s", headers) - self._logger.info("members: %s", members) - self._logger.info("kwargs: %s", kwargs) - - if isinstance(options, SpeakWebSocketOptions) and not options.check(): - self._logger.error("options.check failed") - self._logger.debug("SpeakStreamClient.start LEAVE") - raise DeepgramError("Fatal text-to-speech options error") - - self._addons = addons - self._headers = headers - - # add "members" as members of the class - if members is not None: - self.__dict__.update(members) - - # set kwargs as members of the class - if kwargs is not None: - self._kwargs = kwargs - else: - self._kwargs = {} - - if isinstance(options, SpeakWebSocketOptions): - self._logger.info("SpeakWebSocketOptions switching class -> dict") - self._options = options.to_dict() - elif options is not None: - self._options = options - else: - self._options = {} - - combined_options = self._options - if self._addons is not None: - self._logger.info("merging addons to options") - combined_options.update(self._addons) - self._logger.info("new options: %s", combined_options) - self._logger.debug("combined_options: %s", combined_options) - - combined_headers = self._config.headers - if self._headers is not None: - self._logger.info("merging headers to options") - combined_headers.update(self._headers) - self._logger.info("new headers: %s", combined_headers) - self._logger.debug("combined_headers: %s", combined_headers) - - url_with_params = append_query_params(self._websocket_url, combined_options) - - try: - self._socket = await websockets.connect( - url_with_params, extra_headers=combined_headers - ) - self._exit_event.clear() - - # debug the threads - for thread in threading.enumerate(): - self._logger.debug("after running thread: %s", thread.name) - self._logger.debug("number of active threads: %s", threading.active_count()) - - # listen thread - self._listen_thread = asyncio.create_task(self._listening()) - - # debug the threads - for thread in threading.enumerate(): - self._logger.debug("after running thread: %s", thread.name) - self._logger.debug("number of active threads: %s", threading.active_count()) - - # push open event - await self._emit( - SpeakWebSocketEvents(SpeakWebSocketEvents.Open), - OpenResponse(type=SpeakWebSocketEvents.Open), - ) - - self._logger.notice("start succeeded") - self._logger.debug("AsyncSpeakStreamClient.start LEAVE") - return True - except websockets.ConnectionClosed as e: - self._logger.error( - "ConnectionClosed in AsyncSpeakStreamClient.start: %s", e - ) - self._logger.debug("AsyncSpeakStreamClient.start LEAVE") - if self._config.options.get("termination_exception_connect") == "true": - raise - return False - except websockets.exceptions.WebSocketException as e: - self._logger.error( - "WebSocketException in AsyncSpeakStreamClient.start: %s", e - ) - self._logger.debug("AsyncSpeakStreamClient.start LEAVE") - if self._config.options.get("termination_exception_connect") == "true": - raise - return False - except Exception as e: # pylint: disable=broad-except - self._logger.error( - "WebSocketException in AsyncSpeakStreamClient.start: %s", e - ) - self._logger.debug("AsyncSpeakStreamClient.start LEAVE") - if self._config.options.get("termination_exception_connect") == "true": - raise - return False - - # pylint: enable=too-many-branches,too-many-statements - - def on(self, event: SpeakWebSocketEvents, handler) -> None: - """ - Registers event handlers for specific events. - """ - self._logger.info("event subscribed: %s", event) - if event in SpeakWebSocketEvents.__members__.values() and callable(handler): - if handler not in self._event_handlers[event]: - self._event_handlers[event].append(handler) - - # triggers the registered event handlers for a specific event - async def _emit(self, event: SpeakWebSocketEvents, *args, **kwargs) -> None: - """ - Emits events to the registered event handlers. - """ - self._logger.debug("AsyncSpeakStreamClient._emit ENTER") - self._logger.debug("callback handlers for: %s", event) - - # debug the threads - for thread in threading.enumerate(): - self._logger.debug("after running thread: %s", thread.name) - self._logger.debug("number of active threads: %s", threading.active_count()) - - tasks = [] - for handler in self._event_handlers[event]: - task = asyncio.create_task(handler(self, *args, **kwargs)) - tasks.append(task) - - if tasks: - self._logger.debug("waiting for tasks to finish...") - await asyncio.gather(*tasks, return_exceptions=True) - tasks.clear() - - # debug the threads - for thread in threading.enumerate(): - self._logger.debug("after running thread: %s", thread.name) - self._logger.debug("number of active threads: %s", threading.active_count()) - - self._logger.debug("AsyncSpeakStreamClient._emit LEAVE") - - # pylint: disable=too-many-return-statements,too-many-statements,too-many-locals,too-many-branches - async def _listening(self) -> None: - """ - Listens for messages from the WebSocket connection. - """ - self._logger.debug("AsyncSpeakStreamClient._listening ENTER") - - while True: - try: - if self._exit_event.is_set(): - self._logger.notice("_listening exiting gracefully") - self._logger.debug("AsyncSpeakStreamClient._listening LEAVE") - return - - if self._socket is None: - self._logger.warning("socket is empty") - self._logger.debug("AsyncSpeakStreamClient._listening LEAVE") - return - - message = await self._socket.recv() - - if message is None: - self._logger.spam("message is None") - continue - - if isinstance(message, bytes): - self._logger.debug("Binary data received") - await self._emit( - SpeakWebSocketEvents(SpeakWebSocketEvents.AudioData), - data=message, - **dict(cast(Dict[Any, Any], self._kwargs)), - ) - else: - data = json.loads(message) - response_type = data.get("type") - self._logger.debug( - "response_type: %s, data: %s", response_type, data - ) - - match response_type: - case SpeakWebSocketEvents.Open: - open_result: OpenResponse = OpenResponse.from_json(message) - self._logger.verbose("OpenResponse: %s", open_result) - await self._emit( - SpeakWebSocketEvents(SpeakWebSocketEvents.Open), - open=open_result, - **dict(cast(Dict[Any, Any], self._kwargs)), - ) - case SpeakWebSocketEvents.Metadata: - meta_result: MetadataResponse = MetadataResponse.from_json( - message - ) - self._logger.verbose("MetadataResponse: %s", meta_result) - await self._emit( - SpeakWebSocketEvents(SpeakWebSocketEvents.Metadata), - metadata=meta_result, - **dict(cast(Dict[Any, Any], self._kwargs)), - ) - case SpeakWebSocketEvents.Flush: - fl_result: FlushedResponse = FlushedResponse.from_json( - message - ) - self._logger.verbose("FlushedResponse: %s", fl_result) - await self._emit( - SpeakWebSocketEvents(SpeakWebSocketEvents.Flush), - flushed=fl_result, - **dict(cast(Dict[Any, Any], self._kwargs)), - ) - case SpeakWebSocketEvents.Close: - close_result: CloseResponse = CloseResponse.from_json( - message - ) - self._logger.verbose("CloseResponse: %s", close_result) - await self._emit( - SpeakWebSocketEvents(SpeakWebSocketEvents.Close), - close=close_result, - **dict(cast(Dict[Any, Any], self._kwargs)), - ) - case SpeakWebSocketEvents.Warning: - war_warning: WarningResponse = WarningResponse.from_json( - message - ) - self._logger.verbose("WarningResponse: %s", war_warning) - await self._emit( - SpeakWebSocketEvents(SpeakWebSocketEvents.Warning), - warning=war_warning, - **dict(cast(Dict[Any, Any], self._kwargs)), - ) - case SpeakWebSocketEvents.Error: - err_error: ErrorResponse = ErrorResponse.from_json(message) - self._logger.verbose("ErrorResponse: %s", err_error) - await self._emit( - SpeakWebSocketEvents(SpeakWebSocketEvents.Error), - error=err_error, - **dict(cast(Dict[Any, Any], self._kwargs)), - ) - case _: - self._logger.warning( - "Unknown Message: response_type: %s, data: %s", - response_type, - data, - ) - unhandled_error: UnhandledResponse = UnhandledResponse( - type=SpeakWebSocketEvents( - SpeakWebSocketEvents.Unhandled - ), - raw=message, - ) - await self._emit( - SpeakWebSocketEvents(SpeakWebSocketEvents.Unhandled), - unhandled=unhandled_error, - **dict(cast(Dict[Any, Any], self._kwargs)), - ) - - except websockets.exceptions.ConnectionClosedOK as e: - self._logger.notice(f"_listening({e.code}) exiting gracefully") - self._logger.debug("AsyncSpeakStreamClient._listening LEAVE") - return - - except websockets.exceptions.ConnectionClosed as e: - if e.code == 1000: - self._logger.notice(f"_listening({e.code}) exiting gracefully") - self._logger.debug("AsyncSpeakStreamClient._listening LEAVE") - return - - self._logger.error( - "ConnectionClosed in AsyncSpeakStreamClient._listening with code %s: %s", - e.code, - e.reason, - ) - cc_error: ErrorResponse = ErrorResponse( - "ConnectionClosed in AsyncSpeakStreamClient._listening", - f"{e}", - "ConnectionClosed", - ) - await self._emit( - SpeakWebSocketEvents(SpeakWebSocketEvents.Error), - error=cc_error, - **dict(cast(Dict[Any, Any], self._kwargs)), - ) - - # signal exit and close - await self._signal_exit() - - self._logger.debug("AsyncSpeakStreamClient._listening LEAVE") - - if self._config.options.get("termination_exception") == "true": - raise - return - - except websockets.exceptions.WebSocketException as e: - self._logger.error( - "WebSocketException in AsyncSpeakStreamClient._listening: %s", e - ) - ws_error: ErrorResponse = ErrorResponse( - "WebSocketException in AsyncSpeakStreamClient._listening", - f"{e}", - "WebSocketException", - ) - await self._emit( - SpeakWebSocketEvents(SpeakWebSocketEvents.Error), - error=ws_error, - **dict(cast(Dict[Any, Any], self._kwargs)), - ) - - # signal exit and close - await self._signal_exit() - - self._logger.debug("AsyncSpeakStreamClient._listening LEAVE") - - if self._config.options.get("termination_exception") == "true": - raise - return - - except Exception as e: # pylint: disable=broad-except - self._logger.error( - "Exception in AsyncSpeakStreamClient._listening: %s", e - ) - e_error: ErrorResponse = ErrorResponse( - "Exception in AsyncSpeakStreamClient._listening", - f"{e}", - "Exception", - ) - await self._emit( - SpeakWebSocketEvents(SpeakWebSocketEvents.Error), - error=e_error, - **dict(cast(Dict[Any, Any], self._kwargs)), - ) - - # signal exit and close - await self._signal_exit() - - self._logger.debug("AsyncSpeakStreamClient._listening LEAVE") - - if self._config.options.get("termination_exception") == "true": - raise - return - - # pylint: disable=too-many-return-statements - - async def send(self, text_input: str) -> bool: - """ - Sends data over the WebSocket connection. - """ - self._logger.spam("AsyncSpeakStreamClient.send ENTER") - - if self._exit_event.is_set(): - self._logger.notice("send exiting gracefully") - self._logger.debug("AsyncSpeakStreamClient.send LEAVE") - return False - - if self._socket is not None: - try: - await self._socket.send( - json.dumps({"type": "Speak", "text": text_input}) - ) - except websockets.exceptions.ConnectionClosedOK as e: - self._logger.notice(f"send() exiting gracefully: {e.code}") - self._logger.debug("AsyncSpeakStreamClient.send LEAVE") - if self._config.options.get("termination_exception_send") == "true": - raise - return True - except websockets.exceptions.ConnectionClosed as e: - if e.code in [1000, 1001]: - self._logger.notice(f"send({e.code}) exiting gracefully") - self._logger.debug("AsyncSpeakStreamClient.send LEAVE") - if self._config.options.get("termination_exception_send") == "true": - raise - return True - - self._logger.error("send() failed - ConnectionClosed: %s", str(e)) - self._logger.spam("AsyncSpeakStreamClient.send LEAVE") - if self._config.options.get("termination_exception_send") == "true": - raise - return False - except websockets.exceptions.WebSocketException as e: - self._logger.error("send() failed - WebSocketException: %s", str(e)) - self._logger.spam("AsyncSpeakStreamClient.send LEAVE") - if self._config.options.get("termination_exception_send") == "true": - raise - return False - except Exception as e: # pylint: disable=broad-except - self._logger.error("send() failed - Exception: %s", str(e)) - self._logger.spam("AsyncSpeakStreamClient.send LEAVE") - if self._config.options.get("termination_exception_send") == "true": - raise - return False - - self._logger.spam("send() succeeded") - self._logger.spam("AsyncSpeakStreamClient.send LEAVE") - return True - - self._logger.spam("send() failed. socket is None") - self._logger.spam("AsyncSpeakStreamClient.send LEAVE") - return False - - # pylint: enable=too-many-return-statements - - async def flush(self) -> bool: - """ - Flushes the current buffer and returns generated audio - """ - self._logger.spam("AsyncSpeakStreamClient.flush ENTER") - - if self._exit_event.is_set(): - self._logger.notice("flush exiting gracefully") - self._logger.debug("AsyncSpeakStreamClient.flush LEAVE") - return False - - if self._socket is None: - self._logger.notice("socket is not intialized") - self._logger.debug("AsyncSpeakStreamClient.flush LEAVE") - return False - - self._logger.notice("Sending Flush...") - ret = self.send(json.dumps({"type": "Flush"})) - - if not ret: - self._logger.error("flush failed") - self._logger.spam("AsyncSpeakStreamClient.flush LEAVE") - return False - - self._logger.notice("flush succeeded") - self._logger.spam("AsyncSpeakStreamClient.flush LEAVE") - - return True - - async def finish(self) -> bool: - """ - Closes the WebSocket connection gracefully. - """ - self._logger.debug("AsyncSpeakStreamClient.finish ENTER") - - # signal exit - await self._signal_exit() - - # stop the threads - self._logger.verbose("cancelling tasks...") - try: - # Before cancelling, check if the tasks were created - # debug the threads - for thread in threading.enumerate(): - self._logger.debug("before running thread: %s", thread.name) - self._logger.debug("number of active threads: %s", threading.active_count()) - - tasks = [] - - if self._listen_thread is not None: - self._listen_thread.cancel() - tasks.append(self._listen_thread) - self._logger.notice("processing _listen_thread cancel...") - - # Use asyncio.gather to wait for tasks to be cancelled - await asyncio.wait_for( - asyncio.gather(*tasks), timeout=10 - ) # Prevent indefinite waiting - self._logger.notice("threads joined") - - # debug the threads - for thread in threading.enumerate(): - self._logger.debug("after running thread: %s", thread.name) - self._logger.debug("number of active threads: %s", threading.active_count()) - - self._logger.notice("finish succeeded") - self._logger.spam("AsyncSpeakStreamClient.finish LEAVE") - return True - - except asyncio.CancelledError as e: - self._logger.error("tasks cancelled error: %s", e) - self._logger.debug("AsyncSpeakStreamClient.finish LEAVE") - return False - - except asyncio.TimeoutError as e: - self._logger.error("tasks cancellation timed out: %s", e) - self._logger.debug("AsyncSpeakStreamClient.finish LEAVE") - return False - - async def _signal_exit(self) -> None: - # send close event - self._logger.verbose("closing socket...") - if self._socket is not None: - self._logger.verbose("send CloseStream...") - try: - # if the socket connection is closed, the following line might throw an error - await self.send(json.dumps({"type": "CloseStream"})) - except websockets.exceptions.ConnectionClosedOK as e: - self._logger.notice("_signal_exit - ConnectionClosedOK: %s", e.code) - except websockets.exceptions.ConnectionClosed as e: - self._logger.notice("_signal_exit - ConnectionClosed: %s", e.code) - except websockets.exceptions.WebSocketException as e: - self._logger.error("_signal_exit - WebSocketException: %s", str(e)) - except Exception as e: # pylint: disable=broad-except - self._logger.error("_signal_exit - Exception: %s", str(e)) - - # push close event - try: - await self._emit( - SpeakWebSocketEvents(SpeakWebSocketEvents.Close), - close=CloseResponse(type=SpeakWebSocketEvents.Close), - **dict(cast(Dict[Any, Any], self._kwargs)), - ) - except Exception as e: # pylint: disable=broad-except - self._logger.error("_emit - Exception: %s", e) - - # wait for task to send - await asyncio.sleep(0.5) - - # signal exit - self._exit_event.set() - - # closes the WebSocket connection gracefully - self._logger.verbose("clean up socket...") - if self._socket is not None: - self._logger.verbose("socket.wait_closed...") - try: - await self._socket.close() - except websockets.exceptions.WebSocketException as e: - self._logger.error("socket.wait_closed failed: %s", e) - - self._socket = None # type: ignore diff --git a/deepgram/clients/speak/v1/websocket/client.py b/deepgram/clients/speak/v1/websocket/client.py deleted file mode 100644 index 831e04eb..00000000 --- a/deepgram/clients/speak/v1/websocket/client.py +++ /dev/null @@ -1,558 +0,0 @@ -# Copyright 2024 Deepgram SDK contributors. All Rights Reserved. -# Use of this source code is governed by a MIT license that can be found in the LICENSE file. -# SPDX-License-Identifier: MIT - -import json -import time -import logging -from typing import Dict, Union, Optional, cast, Any -import threading - -from websockets.sync.client import connect, ClientConnection -import websockets - -from .....utils import verboselogs -from .....options import DeepgramClientOptions -from ...enums import SpeakWebSocketEvents -from .helpers import convert_to_websocket_url, append_query_params -from ...errors import DeepgramError - -from .response import ( - OpenResponse, - MetadataResponse, - FlushedResponse, - CloseResponse, - WarningResponse, - ErrorResponse, - UnhandledResponse, -) -from .options import SpeakWebSocketOptions - - -class SpeakWebSocketClient: # pylint: disable=too-many-instance-attributes - """ - Client for interacting with Deepgram's text-to-speech services over WebSockets. - - This class provides methods to establish a WebSocket connection for TTS synthesis and handle real-time TTS synthesis events. - - Args: - config (DeepgramClientOptions): all the options for the client. - """ - - _logger: verboselogs.VerboseLogger - _config: DeepgramClientOptions - _endpoint: str - _websocket_url: str - - _socket: ClientConnection - _exit_event: threading.Event - _lock_send: threading.Lock - _event_handlers: Dict[SpeakWebSocketEvents, list] - - _listen_thread: Union[threading.Thread, None] - - _kwargs: Optional[Dict] = None - _addons: Optional[Dict] = None - _options: Optional[Dict] = None - _headers: Optional[Dict] = None - - def __init__(self, config: DeepgramClientOptions): - if config is None: - raise DeepgramError("Config are required") - - self._logger = verboselogs.VerboseLogger(__name__) - self._logger.addHandler(logging.StreamHandler()) - self._logger.setLevel(config.verbose) - - self._config = config - self._endpoint = "v1/speak" - self._lock_send = threading.Lock() - - self._listen_thread = None - - # exit - self._exit_event = threading.Event() - - self._event_handlers = { - event: [] for event in SpeakWebSocketEvents.__members__.values() - } - self._websocket_url = convert_to_websocket_url(self._config.url, self._endpoint) - - # pylint: disable=too-many-statements,too-many-branches - def start( - self, - options: Optional[Union[SpeakWebSocketOptions, Dict]] = None, - addons: Optional[Dict] = None, - headers: Optional[Dict] = None, - members: Optional[Dict] = None, - **kwargs, - ) -> bool: - """ - Starts the WebSocket connection for text-to-speech synthesis. - """ - self._logger.debug("SpeakStreamClient.start ENTER") - self._logger.info("options: %s", options) - self._logger.info("addons: %s", addons) - self._logger.info("headers: %s", headers) - self._logger.info("members: %s", members) - self._logger.info("kwargs: %s", kwargs) - - if isinstance(options, SpeakWebSocketOptions) and not options.check(): - self._logger.error("options.check failed") - self._logger.debug("SpeakStreamClient.start LEAVE") - raise DeepgramError("Fatal text-to-speech options error") - - self._addons = addons - self._headers = headers - - # add "members" as members of the class - if members is not None: - self.__dict__.update(members) - - # set kwargs as members of the class - if kwargs is not None: - self._kwargs = kwargs - else: - self._kwargs = {} - - if isinstance(options, SpeakWebSocketOptions): - self._logger.info("SpeakWebSocketOptions switching class -> dict") - self._options = options.to_dict() - elif options is not None: - self._options = options - else: - self._options = {} - - combined_options = self._options - if self._addons is not None: - self._logger.info("merging addons to options") - combined_options.update(self._addons) - self._logger.info("new options: %s", combined_options) - self._logger.debug("combined_options: %s", combined_options) - - combined_headers = self._config.headers - if self._headers is not None: - self._logger.info("merging headers to options") - combined_headers.update(self._headers) - self._logger.info("new headers: %s", combined_headers) - self._logger.debug("combined_headers: %s", combined_headers) - - url_with_params = append_query_params(self._websocket_url, combined_options) - try: - self._socket = connect(url_with_params, additional_headers=combined_headers) - self._exit_event.clear() - - # debug the threads - for thread in threading.enumerate(): - self._logger.debug("after running thread: %s", thread.name) - self._logger.debug("number of active threads: %s", threading.active_count()) - - # listening thread - self._listen_thread = threading.Thread(target=self._listening) - self._listen_thread.start() - - # debug the threads - for thread in threading.enumerate(): - self._logger.debug("after running thread: %s", thread.name) - self._logger.debug("number of active threads: %s", threading.active_count()) - - # push open event - self._emit( - SpeakWebSocketEvents(SpeakWebSocketEvents.Open), - OpenResponse(type=SpeakWebSocketEvents.Open), - ) - - self._logger.notice("start succeeded") - self._logger.debug("SpeakStreamClient.start LEAVE") - return True - except websockets.ConnectionClosed as e: - self._logger.error("ConnectionClosed in SpeakStreamClient.start: %s", e) - self._logger.debug("SpeakStreamClient.start LEAVE") - if self._config.options.get("termination_exception_connect") == "true": - raise e - return False - except websockets.exceptions.WebSocketException as e: - self._logger.error("WebSocketException in SpeakStreamClient.start: %s", e) - self._logger.debug("SpeakStreamClient.start LEAVE") - if self._config.options.get("termination_exception_connect") == "true": - raise e - return False - except Exception as e: # pylint: disable=broad-except - self._logger.error("WebSocketException in SpeakStreamClient.start: %s", e) - self._logger.debug("SpeakStreamClient.start LEAVE") - if self._config.options.get("termination_exception_connect") == "true": - raise e - return False - - # pylint: enable=too-many-statements,too-many-branches - - def on( - self, event: SpeakWebSocketEvents, handler - ) -> None: # registers event handlers for specific events - """ - Registers event handlers for specific events. - """ - self._logger.info("event subscribed: %s", event) - if event in SpeakWebSocketEvents.__members__.values() and callable(handler): - self._event_handlers[event].append(handler) - - def _emit(self, event: SpeakWebSocketEvents, *args, **kwargs) -> None: - """ - Emits events to the registered event handlers. - """ - self._logger.debug("callback handlers for: %s", event) - for handler in self._event_handlers[event]: - handler(self, *args, **kwargs) - - # pylint: disable=too-many-return-statements,too-many-statements,too-many-locals,too-many-branches - def _listening( - self, - ) -> None: - """ - Listens for messages from the WebSocket connection. - """ - self._logger.debug("SpeakStreamClient._listening ENTER") - - while True: - try: - if self._exit_event.is_set(): - self._logger.notice("_listening exiting gracefully") - self._logger.debug("SpeakStreamClient._listening LEAVE") - return - - if self._socket is None: - self._logger.warning("socket is empty") - self._logger.debug("SpeakStreamClient._listening LEAVE") - return - - message = self._socket.recv() - - if message is None: - self._logger.info("message is empty") - continue - - if isinstance(message, bytes): - self._logger.debug("Binary data received") - self._emit( - SpeakWebSocketEvents(SpeakWebSocketEvents.AudioData), - data=message, - **dict(cast(Dict[Any, Any], self._kwargs)), - ) - else: - data = json.loads(message) - response_type = data.get("type") - self._logger.debug( - "response_type: %s, data: %s", response_type, data - ) - - match response_type: - case SpeakWebSocketEvents.Open: - open_result: OpenResponse = OpenResponse.from_json(message) - self._logger.verbose("OpenResponse: %s", open_result) - self._emit( - SpeakWebSocketEvents(SpeakWebSocketEvents.Open), - open=open_result, - **dict(cast(Dict[Any, Any], self._kwargs)), - ) - case SpeakWebSocketEvents.Metadata: - meta_result: MetadataResponse = MetadataResponse.from_json( - message - ) - self._logger.verbose("MetadataResponse: %s", meta_result) - self._emit( - SpeakWebSocketEvents(SpeakWebSocketEvents.Metadata), - metadata=meta_result, - **dict(cast(Dict[Any, Any], self._kwargs)), - ) - case SpeakWebSocketEvents.Flush: - fl_result: FlushedResponse = FlushedResponse.from_json( - message - ) - self._logger.verbose("FlushedResponse: %s", fl_result) - self._emit( - SpeakWebSocketEvents(SpeakWebSocketEvents.Flush), - flushed=fl_result, - **dict(cast(Dict[Any, Any], self._kwargs)), - ) - case SpeakWebSocketEvents.Close: - close_result: CloseResponse = CloseResponse.from_json( - message - ) - self._logger.verbose("CloseResponse: %s", close_result) - self._emit( - SpeakWebSocketEvents(SpeakWebSocketEvents.Close), - close=close_result, - **dict(cast(Dict[Any, Any], self._kwargs)), - ) - case SpeakWebSocketEvents.Warning: - war_warning: WarningResponse = WarningResponse.from_json( - message - ) - self._logger.verbose("WarningResponse: %s", war_warning) - self._emit( - SpeakWebSocketEvents(SpeakWebSocketEvents.Warning), - warning=war_warning, - **dict(cast(Dict[Any, Any], self._kwargs)), - ) - case SpeakWebSocketEvents.Error: - err_error: ErrorResponse = ErrorResponse.from_json(message) - self._logger.verbose("ErrorResponse: %s", err_error) - self._emit( - SpeakWebSocketEvents(SpeakWebSocketEvents.Error), - error=err_error, - **dict(cast(Dict[Any, Any], self._kwargs)), - ) - case _: - self._logger.warning( - "Unknown Message: response_type: %s, data: %s", - response_type, - data, - ) - unhandled_error: UnhandledResponse = UnhandledResponse( - type=SpeakWebSocketEvents( - SpeakWebSocketEvents.Unhandled - ), - raw=message, - ) - self._emit( - SpeakWebSocketEvents(SpeakWebSocketEvents.Unhandled), - unhandled=unhandled_error, - **dict(cast(Dict[Any, Any], self._kwargs)), - ) - - except websockets.exceptions.ConnectionClosedOK as e: - self._logger.notice(f"_listening({e.code}) exiting gracefully") - self._logger.debug("SpeakStreamClient._listening LEAVE") - return - - except websockets.exceptions.ConnectionClosed as e: - if e.code == 1000: - self._logger.notice(f"_listening({e.code}) exiting gracefully") - self._logger.debug("SpeakStreamClient._listening LEAVE") - return - - self._logger.error( - "ConnectionClosed in SpeakStreamClient._listening with code %s: %s", - e.code, - e.reason, - ) - cc_error: ErrorResponse = ErrorResponse( - "ConnectionClosed in SpeakStreamClient._listening", - f"{e}", - "ConnectionClosed", - ) - self._emit(SpeakWebSocketEvents(SpeakWebSocketEvents.Error), cc_error) - - # signal exit and close - self._signal_exit() - - self._logger.debug("SpeakStreamClient._listening LEAVE") - - if self._config.options.get("termination_exception") == "true": - raise - return - - except websockets.exceptions.WebSocketException as e: - self._logger.error( - "WebSocketException in SpeakStreamClient._listening with: %s", e - ) - ws_error: ErrorResponse = ErrorResponse( - "WebSocketException in SpeakStreamClient._listening", - f"{e}", - "WebSocketException", - ) - self._emit(SpeakWebSocketEvents(SpeakWebSocketEvents.Error), ws_error) - - # signal exit and close - self._signal_exit() - - self._logger.debug("SpeakStreamClient._listening LEAVE") - - if self._config.options.get("termination_exception") == "true": - raise - return - - except Exception as e: # pylint: disable=broad-except - self._logger.error("Exception in SpeakStreamClient._listening: %s", e) - e_error: ErrorResponse = ErrorResponse( - "Exception in SpeakStreamClient._listening", - f"{e}", - "Exception", - ) - self._logger.error( - "Exception in SpeakStreamClient._listening: %s", str(e) - ) - self._emit(SpeakWebSocketEvents(SpeakWebSocketEvents.Error), e_error) - - # signal exit and close - self._signal_exit() - - self._logger.debug("SpeakStreamClient._listening LEAVE") - - if self._config.options.get("termination_exception") == "true": - raise - return - - # pylint: disable=too-many-return-statements - def send(self, text_input: str) -> bool: - """ - Sends data over the WebSocket connection. - """ - self._logger.spam("SpeakStreamClient.send ENTER") - - if self._exit_event.is_set(): - self._logger.notice("send exiting gracefully") - self._logger.debug("SpeakStreamClient.send LEAVE") - return False - - if self._socket is not None: - with self._lock_send: - try: - self._socket.send(json.dumps({"type": "Speak", "text": text_input})) - except websockets.exceptions.ConnectionClosedOK as e: - self._logger.notice(f"send() exiting gracefully: {e.code}") - self._logger.debug("SpeakStreamClient._keep_alive LEAVE") - if self._config.options.get("termination_exception_send") == "true": - raise - return True - except websockets.exceptions.ConnectionClosed as e: - if e.code == 1000: - self._logger.notice(f"send({e.code}) exiting gracefully") - self._logger.debug("SpeakStreamClient.send LEAVE") - if ( - self._config.options.get("termination_exception_send") - == "true" - ): - raise - return True - self._logger.error("send() failed - ConnectionClosed: %s", str(e)) - self._logger.spam("SpeakStreamClient.send LEAVE") - if self._config.options.get("termination_exception_send") == "true": - raise - return False - except websockets.exceptions.WebSocketException as e: - self._logger.error("send() failed - WebSocketException: %s", str(e)) - self._logger.spam("SpeakStreamClient.send LEAVE") - if self._config.options.get("termination_exception_send") == "true": - raise - return False - except Exception as e: # pylint: disable=broad-except - self._logger.error("send() failed - Exception: %s", str(e)) - self._logger.spam("SpeakStreamClient.send LEAVE") - if self._config.options.get("termination_exception_send") == "true": - raise - return False - - self._logger.spam("send() succeeded") - self._logger.spam("SpeakStreamClient.send LEAVE") - return True - - self._logger.spam("send() failed. socket is None") - self._logger.spam("SpeakStreamClient.send LEAVE") - return False - - # pylint: enable=too-many-return-statements - - def flush(self) -> bool: - """ - Flushes the current buffer and returns generated audio - """ - self._logger.spam("SpeakStreamClient.flush ENTER") - - if self._exit_event.is_set(): - self._logger.notice("flush exiting gracefully") - self._logger.debug("SpeakStreamClient.flush LEAVE") - return False - - if self._socket is None: - self._logger.notice("socket is not intialized") - self._logger.debug("SpeakStreamClient.flush LEAVE") - return False - - self._logger.notice("Sending Flush...") - ret = self.send(json.dumps({"type": "Flush"})) - - if not ret: - self._logger.error("flush failed") - self._logger.spam("SpeakStreamClient.flush LEAVE") - return False - - self._logger.notice("flush succeeded") - self._logger.spam("SpeakStreamClient.flush LEAVE") - - return True - - # closes the WebSocket connection gracefully - def finish(self) -> bool: - """ - Closes the WebSocket connection gracefully. - """ - self._logger.spam("SpeakStreamClient.finish ENTER") - - # debug the threads - for thread in threading.enumerate(): - self._logger.debug("before running thread: %s", thread.name) - self._logger.debug("number of active threads: %s", threading.active_count()) - - # signal exit - self._signal_exit() - - # stop the threads - - if self._listen_thread is not None: - self._listen_thread.join() - self._listen_thread = None - self._logger.notice("listening thread joined") - - # debug the threads - for thread in threading.enumerate(): - self._logger.debug("before running thread: %s", thread.name) - self._logger.debug("number of active threads: %s", threading.active_count()) - - self._logger.notice("finish succeeded") - self._logger.spam("SpeakStreamClient.finish LEAVE") - return True - - # signals the WebSocket connection to exit - def _signal_exit(self) -> None: - # closes the WebSocket connection gracefully - self._logger.notice("closing socket...") - if self._socket is not None: - self._logger.notice("sending Close...") - try: - # if the socket connection is closed, the following line might throw an error - self._socket.send(json.dumps({"type": "Close"})) - except websockets.exceptions.ConnectionClosedOK as e: - self._logger.notice("_signal_exit - ConnectionClosedOK: %s", e.code) - except websockets.exceptions.ConnectionClosed as e: - self._logger.error("_signal_exit - ConnectionClosed: %s", e.code) - except websockets.exceptions.WebSocketException as e: - self._logger.error("_signal_exit - WebSocketException: %s", str(e)) - except Exception as e: # pylint: disable=broad-except - self._logger.error("_signal_exit - Exception: %s", str(e)) - - # push close event - try: - self._emit( - SpeakWebSocketEvents(SpeakWebSocketEvents.Close), - CloseResponse(type=SpeakWebSocketEvents.Close), - ) - except Exception as e: # pylint: disable=broad-except - self._logger.error("_signal_exit - Exception: %s", e) - - # wait for task to send - time.sleep(0.5) - - # signal exit - self._exit_event.set() - - # closes the WebSocket connection gracefully - self._logger.verbose("clean up socket...") - if self._socket is not None: - self._logger.verbose("socket.wait_closed...") - try: - self._socket.close() - except websockets.exceptions.WebSocketException as e: - self._logger.error("socket.wait_closed failed: %s", e) - - self._socket = None # type: ignore diff --git a/deepgram/clients/speak/v1/websocket/helpers.py b/deepgram/clients/speak/v1/websocket/helpers.py deleted file mode 100644 index dffcdb06..00000000 --- a/deepgram/clients/speak/v1/websocket/helpers.py +++ /dev/null @@ -1,43 +0,0 @@ -from urllib.parse import urlparse, urlunparse, parse_qs, urlencode -from typing import Dict, Optional -import re - - -# This function appends query parameters to a URL -def append_query_params(url: str, params: Optional[Dict] = None): - """ - Appends query parameters to a URL - """ - parsed_url = urlparse(url) - query_params = parse_qs(parsed_url.query) - - if params is not None: - for key, value in params.items(): - if value is None: - continue - if isinstance(value, bool): - value = str(value).lower() - if isinstance(value, list): - for item in value: - query_params[key] = query_params.get(key, []) + [str(item)] - else: - query_params[key] = [str(value)] - - updated_query_string = urlencode(query_params, doseq=True) - updated_url = parsed_url._replace(query=updated_query_string).geturl() - return updated_url - - -# This function converts a URL to a WebSocket URL -def convert_to_websocket_url(base_url: str, endpoint: str): - """ - Converts a URL to a WebSocket URL - """ - if re.match(r"^https?://", base_url, re.IGNORECASE): - base_url = base_url.replace("https://", "").replace("http://", "") - if not re.match(r"^wss?://", base_url, re.IGNORECASE): - base_url = "wss://" + base_url - parsed_url = urlparse(base_url) - domain = parsed_url.netloc - websocket_url = urlunparse((parsed_url.scheme, domain, endpoint, "", "", "")) - return websocket_url diff --git a/deepgram/clients/speak/v1/websocket/options.py b/deepgram/clients/speak/v1/websocket/options.py deleted file mode 100644 index a5d78752..00000000 --- a/deepgram/clients/speak/v1/websocket/options.py +++ /dev/null @@ -1,7 +0,0 @@ -# Copyright 2024 Deepgram SDK contributors. All Rights Reserved. -# Use of this source code is governed by a MIT license that can be found in the LICENSE file. -# SPDX-License-Identifier: MIT - -from ..options import SpeakOptions - -SpeakWebSocketOptions = SpeakOptions diff --git a/deepgram/clients/speak/v1/websocket/response.py b/deepgram/clients/speak/v1/websocket/response.py deleted file mode 100644 index 87f1d466..00000000 --- a/deepgram/clients/speak/v1/websocket/response.py +++ /dev/null @@ -1,185 +0,0 @@ -# Copyright 2024 Deepgram SDK contributors. All Rights Reserved. -# Use of this source code is governed by a MIT license that can be found in the LICENSE file. -# SPDX-License-Identifier: MIT - - -from typing import Optional -import io - -from dataclasses import dataclass, field -from dataclasses_json import config as dataclass_config, DataClassJsonMixin - -# Speak Response Types: - - -@dataclass -class SpeakWebSocketResponse( - DataClassJsonMixin -): # pylint: disable=too-many-instance-attributes - """ - A class for representing a response from the speak (streaming) endpoint. - """ - - content_type: str = "" - request_id: str = "" - model_uuid: str = "" - model_name: str = "" - date: str = "" - stream: Optional[io.BytesIO] = field( - default=None, metadata=dataclass_config(exclude=lambda f: f is None) - ) - - def __getitem__(self, key): - _dict = self.to_dict() - return _dict[key] - - def __setitem__(self, key, val): - self.__dict__[key] = val - - # this is a hack to make the response look like a dict because of the io.BytesIO object - # otherwise it will throw an exception on printing - def __str__(self) -> str: - my_dict = self.to_dict() - return my_dict.__str__() - - -@dataclass -class OpenResponse(DataClassJsonMixin): - """ - Open Message from the Deepgram Platform - """ - - type: str = "" - - def __getitem__(self, key): - _dict = self.to_dict() - return _dict[key] - - def __setitem__(self, key, val): - self.__dict__[key] = val - - def __str__(self) -> str: - return self.to_json(indent=4) - - -@dataclass -class MetadataResponse(DataClassJsonMixin): - """ - Metadata object - """ - - request_id: str = "" - - def __getitem__(self, key): - _dict = self.to_dict() - return _dict[key] - - def __setitem__(self, key, val): - self.__dict__[key] = val - - def __str__(self) -> str: - return self.to_json(indent=4) - - -@dataclass -class FlushedResponse(DataClassJsonMixin): - """ - Flushed Message from the Deepgram Platform - """ - - type: str = "" - - def __getitem__(self, key): - _dict = self.to_dict() - return _dict[key] - - def __setitem__(self, key, val): - self.__dict__[key] = val - - def __str__(self) -> str: - return self.to_json(indent=4) - - -@dataclass -class CloseResponse(DataClassJsonMixin): - """ - Close Message from the Deepgram Platform - """ - - type: str = "" - - def __getitem__(self, key): - _dict = self.to_dict() - return _dict[key] - - def __setitem__(self, key, val): - self.__dict__[key] = val - - def __str__(self) -> str: - return self.to_json(indent=4) - - -@dataclass -class ErrorResponse(DataClassJsonMixin): - """ - Error Message from the Deepgram Platform - """ - - description: str = "" - message: str = "" - type: str = "" - variant: Optional[str] = "" - - def __getitem__(self, key): - _dict = self.to_dict() - return _dict[key] - - def __setitem__(self, key, val): - self.__dict__[key] = val - - def __str__(self) -> str: - return self.to_json(indent=4) - - -@dataclass -class WarningResponse(DataClassJsonMixin): - """ - Warning Message from the Deepgram Platform - """ - - warn_code: str = "" - warn_msg: str = "" - type: str = "" - - def __getitem__(self, key): - _dict = self.to_dict() - return _dict[key] - - def __setitem__(self, key, val): - self.__dict__[key] = val - - def __str__(self) -> str: - return self.to_json(indent=4) - - -# Unhandled Message - - -@dataclass -class UnhandledResponse(DataClassJsonMixin): - """ - Unhandled Message from the Deepgram Platform - """ - - type: str = "" - raw: str = "" - - def __getitem__(self, key): - _dict = self.to_dict() - return _dict[key] - - def __setitem__(self, key, val): - self.__dict__[key] = val - - def __str__(self) -> str: - return self.to_json(indent=4) diff --git a/examples/text-to-speech/websocket/async_interactive/main.py b/examples/text-to-speech/websocket/async_interactive/main.py deleted file mode 100644 index 7e44a8e9..00000000 --- a/examples/text-to-speech/websocket/async_interactive/main.py +++ /dev/null @@ -1,103 +0,0 @@ -from dotenv import load_dotenv -import asyncio -from websockets.exceptions import ConnectionClosedError -from deepgram.utils import verboselogs -from deepgram import ( - DeepgramClient, - DeepgramClientOptions, - SpeakWebSocketEvents, - SpeakOptions, -) - -load_dotenv() - -TTS_TEXT = "Hello, this is a text to speech example using Deepgram." -AUDIO_FILE = "output.mp3" - - -async def main(): - try: - # example of setting up a client config. logging values: WARNING, VERBOSE, DEBUG, SPAM - config: DeepgramClientOptions = DeepgramClientOptions(verbose=verboselogs.DEBUG) - deepgram: DeepgramClient = DeepgramClient("", config) - # otherwise, use default config - # deepgram: DeepgramClient = DeepgramClient() - - # Create a websocket connection to Deepgram - dg_connection = deepgram.speak.asyncwebsocket.v("1") - - async def on_open(client, open_response, **kwargs): - print(f"\n\nOpen: {open_response}\n\n") - await send_tts_text(client) - - async def on_binary_data(client, data, **kwargs): - print("Received binary data") - await write_binary_to_mp3(data) - - async def on_metadata(client, metadata, **kwargs): - print(f"\n\nMetadata: {metadata}\n\n") - - async def on_flush(client, flush, **kwargs): - print(f"\n\nFlush: {flush}\n\n") - - async def on_close(client, close, **kwargs): - print(f"\n\nClose: {close}\n\n") - - async def on_warning(client, warning, **kwargs): - print(f"\n\nWarning: {warning}\n\n") - - async def on_error(client, error, **kwargs): - print(f"\n\nError: {error}\n\n") - - async def on_unhandled(client, unhandled, **kwargs): - print(f"\n\nUnhandled: {unhandled}\n\n") - - async def write_binary_to_mp3(data): - loop = asyncio.get_running_loop() - try: - with open(AUDIO_FILE, "ab") as f: - await loop.run_in_executor(None, f.write, data) - except Exception as e: - print(f"Failed to write data to file: {e}") - finally: - print("File operation completed.") - - dg_connection.on(SpeakWebSocketEvents.Open, on_open) - dg_connection.on(SpeakWebSocketEvents.AudioData, on_binary_data) - dg_connection.on(SpeakWebSocketEvents.Metadata, on_metadata) - dg_connection.on(SpeakWebSocketEvents.Flush, on_flush) - dg_connection.on(SpeakWebSocketEvents.Close, on_close) - dg_connection.on(SpeakWebSocketEvents.Warning, on_warning) - dg_connection.on(SpeakWebSocketEvents.Error, on_error) - dg_connection.on(SpeakWebSocketEvents.Unhandled, on_unhandled) - - async def send_tts_text(client): - await client.send(TTS_TEXT) - - # Connect to the WebSocket - options = SpeakOptions(model="aura-asteria-en") - - if not await dg_connection.start(options): - print("Failed to start connection") - return - - # Wait for user input to finish - await asyncio.get_event_loop().run_in_executor( - None, input, "\n\nPress Enter to stop...\n\n" - ) - await dg_connection.finish() - - print("Finished") - - except ConnectionClosedError as e: - print(f"WebSocket connection closed unexpectedly: {e}") - except asyncio.CancelledError as e: - print(f"Asyncio task was cancelled: {e}") - except OSError as e: - print(f"File operation failed: {e}") - except Exception as e: - print(f"An unexpected error occurred: {e}") - - -if __name__ == "__main__": - asyncio.run(main()) diff --git a/examples/text-to-speech/websocket/interactive/main.py b/examples/text-to-speech/websocket/interactive/main.py deleted file mode 100644 index 3082a09c..00000000 --- a/examples/text-to-speech/websocket/interactive/main.py +++ /dev/null @@ -1,103 +0,0 @@ -# Copyright 2024 Deepgram SDK contributors. All Rights Reserved. -# Use of this source code is governed by a MIT license that can be found in the LICENSE file. -# SPDX-License-Identifier: MIT - -from dotenv import load_dotenv -import threading -from websockets.exceptions import ConnectionClosedError -from deepgram.utils import verboselogs - -from deepgram import ( - DeepgramClient, - DeepgramClientOptions, - SpeakWebSocketEvents, - SpeakOptions, -) - -load_dotenv() - - -TTS_TEXT = "Hello, this is a text to speech example using Deepgram." -AUDIO_FILE = "output.mp3" - - -def main(): - try: - # example of setting up a client config. logging values: WARNING, VERBOSE, DEBUG, SPAM - config: DeepgramClientOptions = DeepgramClientOptions(verbose=verboselogs.DEBUG) - deepgram: DeepgramClient = DeepgramClient("", config) - # otherwise, use default config - # deepgram: DeepgramClient = DeepgramClient() - - # Create a websocket connection to Deepgram - dg_connection = deepgram.speak.websocket.v("1") - # print(dg_connection) - - def on_open(self, open, **kwargs): - print(f"\n\n{open}\n\n") - thread = threading.Thread(target=send_tts_text, args=(self,)) - thread.start() - thread.join() - - def on_binary_data(self, data, **kwargs): - print("Received binary data:") - with open(AUDIO_FILE, "ab") as f: - f.write(data) - - def on_metadata(self, metadata, **kwargs): - print(f"\n\n{metadata}\n\n") - - def on_flush(self, flush, **kwargs): - print(f"\n\n{flush}\n\n") - - def on_close(self, close, **kwargs): - print(f"\n\n{close}\n\n") - - def on_warning(self, warning, **kwargs): - print(f"\n\n{warning}\n\n") - - def on_error(self, error, **kwargs): - print(f"\n\n{error}\n\n") - - def on_unhandled(self, unhandled, **kwargs): - print(f"\n\n{unhandled}\n\n") - - dg_connection.on(SpeakWebSocketEvents.Open, on_open) - dg_connection.on(SpeakWebSocketEvents.AudioData, on_binary_data) - dg_connection.on(SpeakWebSocketEvents.Metadata, on_metadata) - dg_connection.on(SpeakWebSocketEvents.Flush, on_flush) - dg_connection.on(SpeakWebSocketEvents.Close, on_close) - dg_connection.on(SpeakWebSocketEvents.Error, on_error) - dg_connection.on(SpeakWebSocketEvents.Warning, on_warning) - dg_connection.on(SpeakWebSocketEvents.Unhandled, on_unhandled) - - lock = threading.Lock() - - def send_tts_text(dg_connection): - with lock: - dg_connection.send(TTS_TEXT) - - # connect to websocket - options = SpeakOptions(model="aura-asteria-en") - - print("\n\nPress Enter to stop...\n\n") - if dg_connection.start(options) is False: - print("Failed to start connection") - return - - # Indicate that we've finished - input("\n\nPress Enter to stop...\n\n") - dg_connection.finish() - - print("Finished") - - except ConnectionClosedError as e: - print(f"WebSocket connection closed unexpectedly: {e}") - except ValueError as e: - print(f"Invalid value encountered: {e}") - except Exception as e: - print(f"An unexpected error occurred: {e}") - - -if __name__ == "__main__": - main()