Skip to content

Commit

Permalink
Implement wait_for_complete() for Speaker class
Browse files Browse the repository at this point in the history
  • Loading branch information
davidvonthenen committed Sep 29, 2024
1 parent b233b60 commit feb2680
Show file tree
Hide file tree
Showing 8 changed files with 113 additions and 9 deletions.
1 change: 1 addition & 0 deletions deepgram/audio/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,4 +18,5 @@
CHANNELS as OUTPUT_CHANNELS,
RATE as OUTPUT_RATE,
CHUNK as OUTPUT_CHUNK,
PLAYBACK_DELTA as OUTPUT_PLAYBACK_DELTA,
)
2 changes: 1 addition & 1 deletion deepgram/audio/speaker/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,4 +4,4 @@

from .speaker import Speaker
from .errors import DeepgramSpeakerError
from .constants import LOGGING, CHANNELS, RATE, CHUNK
from .constants import LOGGING, CHANNELS, RATE, CHUNK, PLAYBACK_DELTA
5 changes: 4 additions & 1 deletion deepgram/audio/speaker/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,12 @@

from ...utils import verboselogs

# Constants for microphone
# Constants for speaker
LOGGING = verboselogs.WARNING
TIMEOUT = 0.050
CHANNELS = 1
RATE = 16000
CHUNK = 8194

# Constants for speaker
PLAYBACK_DELTA = 2000
53 changes: 52 additions & 1 deletion deepgram/audio/speaker/speaker.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,15 +8,18 @@
import threading
from typing import Optional, Callable, Union, TYPE_CHECKING
import logging
from datetime import datetime

import websockets

from ...utils import verboselogs
from .constants import LOGGING, CHANNELS, RATE, CHUNK, TIMEOUT
from .constants import LOGGING, CHANNELS, RATE, CHUNK, TIMEOUT, PLAYBACK_DELTA

if TYPE_CHECKING:
import pyaudio

HALF_SECOND = 0.5


class Speaker: # pylint: disable=too-many-instance-attributes
"""
Expand All @@ -33,6 +36,11 @@ class Speaker: # pylint: disable=too-many-instance-attributes
_channels: int
_output_device_index: Optional[int] = None

# last time we received audio
_last_datagram: datetime = datetime.now()
_last_play_delta_in_ms: int
_lock_wait: threading.Lock

_queue: queue.Queue
_exit: threading.Event

Expand All @@ -56,6 +64,7 @@ def __init__(
rate: int = RATE,
chunk: int = CHUNK,
channels: int = CHANNELS,
last_play_delta_in_ms: int = PLAYBACK_DELTA,
output_device_index: Optional[int] = None,
): # pylint: disable=too-many-positional-arguments
# dynamic import of pyaudio as not to force the requirements on the SDK (and users)
Expand All @@ -68,11 +77,15 @@ def __init__(
self._exit = threading.Event()
self._queue = queue.Queue()

self._last_datagram = datetime.now()
self._lock_wait = threading.Lock()

self._audio = pyaudio.PyAudio()
self._chunk = chunk
self._rate = rate
self._format = pyaudio.paInt16
self._channels = channels
self._last_play_delta_in_ms = last_play_delta_in_ms
self._output_device_index = output_device_index

self._push_callback_org = push_callback
Expand Down Expand Up @@ -192,6 +205,42 @@ def start(self, active_loop: Optional[asyncio.AbstractEventLoop] = None) -> bool

return True

def wait_for_complete(self):
"""
This method will block until the speak is done playing sound.
"""
self._logger.debug("Speaker.wait_for_complete ENTER")

delta_in_ms = float(self._last_play_delta_in_ms)
self._logger.debug("Last Play delta: %f", delta_in_ms)

# set to now
with self._lock_wait:
self._last_datagram = datetime.now()

while True:
# sleep for a bit
self._exit.wait(HALF_SECOND)

# check if we should exit
if self._exit.is_set():
self._logger.debug("Exiting wait_for_complete _exit is set")
break

# check the time
with self._lock_wait:
delta = datetime.now() - self._last_datagram
diff_in_ms = delta.total_seconds() * 1000
if diff_in_ms < delta_in_ms:
self._logger.debug("LastPlay delta is less than threshold")
continue

# if we get here, we are done playing audio
self._logger.debug("LastPlay delta is greater than threshold. Exit wait!")
break

self._logger.debug("Speaker.wait_for_complete LEAVE")

def _start_receiver(self):
# Check if the socket is an asyncio WebSocket
if inspect.iscoroutinefunction(self._pull_callback_org):
Expand Down Expand Up @@ -315,6 +364,8 @@ def _play(self, audio_out, stream, stop):
while not stop.is_set():
try:
data = audio_out.get(True, TIMEOUT)
with self._lock_wait:
self._last_datagram = datetime.now()
stream.write(data)
except queue.Empty:
pass
Expand Down
24 changes: 23 additions & 1 deletion deepgram/clients/speak/v1/websocket/async_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@
)
from .options import SpeakWSOptions

from .....audio.speaker import Speaker, RATE, CHANNELS
from .....audio.speaker import Speaker, RATE, CHANNELS, PLAYBACK_DELTA

ONE_SECOND = 1
HALF_SECOND = 0.5
Expand Down Expand Up @@ -93,6 +93,11 @@ def __init__(self, config: DeepgramClientOptions):
channels = self._config.options.get("speaker_playback_channels")
if channels is None:
channels = CHANNELS
playback_delta_in_ms = self._config.options.get(
"speaker_playback_delta_in_ms"
)
if playback_delta_in_ms is None:
playback_delta_in_ms = PLAYBACK_DELTA
device_index = self._config.options.get("speaker_playback_device_index")

self._logger.debug("rate: %s", rate)
Expand All @@ -103,13 +108,15 @@ def __init__(self, config: DeepgramClientOptions):
self._speaker = Speaker(
rate=rate,
channels=channels,
last_play_delta_in_ms=playback_delta_in_ms,
verbose=self._config.verbose,
output_device_index=device_index,
)
else:
self._speaker = Speaker(
rate=rate,
channels=channels,
last_play_delta_in_ms=playback_delta_in_ms,
verbose=self._config.verbose,
)

Expand Down Expand Up @@ -590,6 +597,21 @@ async def clear(self) -> bool:

return True

async def wait_for_complete(self):
"""
This method will block until the speak is done playing sound.
"""
self._logger.spam("AsyncSpeakWebSocketClient.wait_for_complete ENTER")

if self._speaker is None:
self._logger.error("speaker is None. Return immediately")
return

loop = asyncio.get_event_loop()
await loop.run_in_executor(None, self._speaker.wait_for_complete)
self._logger.notice("wait_for_complete succeeded")
self._logger.spam("AsyncSpeakWebSocketClient.wait_for_complete LEAVE")

async def _close_message(self) -> bool:
return await self.send_control(SpeakWebSocketMessage.Close)

Expand Down
23 changes: 22 additions & 1 deletion deepgram/clients/speak/v1/websocket/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@
)
from .options import SpeakWSOptions

from .....audio.speaker import Speaker, RATE, CHANNELS
from .....audio.speaker import Speaker, RATE, CHANNELS, PLAYBACK_DELTA

ONE_SECOND = 1
HALF_SECOND = 0.5
Expand Down Expand Up @@ -96,6 +96,11 @@ def __init__(self, config: DeepgramClientOptions):
channels = self._config.options.get("speaker_playback_channels")
if channels is None:
channels = CHANNELS
playback_delta_in_ms = self._config.options.get(
"speaker_playback_delta_in_ms"
)
if playback_delta_in_ms is None:
playback_delta_in_ms = PLAYBACK_DELTA
device_index = self._config.options.get("speaker_playback_device_index")

self._logger.debug("rate: %s", rate)
Expand All @@ -106,13 +111,15 @@ def __init__(self, config: DeepgramClientOptions):
self._speaker = Speaker(
rate=rate,
channels=channels,
last_play_delta_in_ms=playback_delta_in_ms,
verbose=self._config.verbose,
output_device_index=device_index,
)
else:
self._speaker = Speaker(
rate=rate,
channels=channels,
last_play_delta_in_ms=playback_delta_in_ms,
verbose=self._config.verbose,
)

Expand Down Expand Up @@ -589,6 +596,20 @@ def clear(self) -> bool:

return True

def wait_for_complete(self):
"""
This method will block until the speak is done playing sound.
"""
self._logger.spam("SpeakWebSocketClient.wait_for_complete ENTER")

if self._speaker is None:
self._logger.error("speaker is None. Return immediately")
return

self._speaker.wait_for_complete()
self._logger.notice("wait_for_complete succeeded")
self._logger.spam("SpeakWebSocketClient.wait_for_complete LEAVE")

def _close_message(self) -> bool:
return self.send_control(SpeakWebSocketMessage.Close)

Expand Down
10 changes: 7 additions & 3 deletions examples/text-to-speech/websocket/async_complete/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,11 @@ async def main():

# example of setting up a client config. logging values: WARNING, VERBOSE, DEBUG, SPAM
config: DeepgramClientOptions = DeepgramClientOptions(
options={"auto_flush_speak_delta": "500", "speaker_playback": "true"},
# verbose=verboselogs.SPAM,
options={
# "auto_flush_speak_delta": "500",
"speaker_playback": "true"
},
verbose=verboselogs.SPAM,
)
deepgram: DeepgramClient = DeepgramClient("", config)

Expand Down Expand Up @@ -99,11 +102,12 @@ async def on_unhandled(self, unhandled, **kwargs):

# send the text to Deepgram
await dg_connection.send_text(TTS_TEXT)

# if auto_flush_speak_delta is not used, you must flush the connection by calling flush()
await dg_connection.flush()

# Indicate that we've finished
await asyncio.sleep(7)
await dg_connection.wait_for_complete()

# Close the connection
await dg_connection.finish()
Expand Down
4 changes: 3 additions & 1 deletion examples/text-to-speech/websocket/complete/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -93,11 +93,13 @@ def on_unhandled(self, unhandled, **kwargs):

# send the text to Deepgram
dg_connection.send_text(TTS_TEXT)

# if auto_flush_speak_delta is not used, you must flush the connection by calling flush()
dg_connection.flush()

# Indicate that we've finished
time.sleep(5)
dg_connection.wait_for_complete()

print("\n\nPress Enter to stop...\n\n")
input()

Expand Down

0 comments on commit feb2680

Please sign in to comment.