Skip to content

Commit

Permalink
Merge pull request #1039 from pipecat-ai/aleix/fish-audio-websocket-s…
Browse files Browse the repository at this point in the history
…ervice

services(fish): FishAudioTTSService to use WebsocketService
  • Loading branch information
aconchillo authored Jan 18, 2025
2 parents ea44c59 + e0011a3 commit 2abbd4b
Show file tree
Hide file tree
Showing 2 changed files with 7 additions and 31 deletions.
5 changes: 2 additions & 3 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,7 @@ dependencies = [
"protobuf~=5.29.3",
"pydantic~=2.10.5",
"pyloudnorm~=0.1.1",
"resampy~=0.4.3",
"tenacity~=9.0.0"
"resampy~=0.4.3"
]

[project.urls]
Expand Down Expand Up @@ -63,7 +62,7 @@ fireworks = [ "openai~=1.59.6" ]
krisp = [ "pipecat-ai-krisp~=0.3.0" ]
koala = [ "pvkoala~=2.0.3" ]
langchain = [ "langchain~=0.3.14", "langchain-community~=0.3.14", "langchain-openai~=0.3.0" ]
livekit = [ "livekit~=0.19.1", "livekit-api~=0.8.1" ]
livekit = [ "livekit~=0.19.1", "livekit-api~=0.8.1", "tenacity~=9.0.0" ]
lmnt = [ "websockets~=13.1" ]
local = [ "pyaudio~=0.2.14" ]
moondream = [ "einops~=0.8.0", "timm~=1.0.13", "transformers~=4.48.0" ]
Expand Down
33 changes: 5 additions & 28 deletions src/pipecat/services/fish.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,11 @@
# SPDX-License-Identifier: BSD 2-Clause License
#

import asyncio
import uuid
from typing import AsyncGenerator, Literal, Optional

from loguru import logger
from pydantic import BaseModel
from tenacity import AsyncRetrying, RetryCallState, stop_after_attempt, wait_exponential

from pipecat.frames.frames import (
BotStoppedSpeakingFrame,
Expand All @@ -28,6 +26,7 @@
)
from pipecat.processors.frame_processor import FrameDirection
from pipecat.services.ai_services import TTSService
from pipecat.services.websocket_service import WebsocketService
from pipecat.transcriptions.language import Language

try:
Expand All @@ -44,7 +43,7 @@
FishAudioOutputFormat = Literal["opus", "mp3", "pcm", "wav"]


class FishAudioTTSService(TTSService):
class FishAudioTTSService(TTSService, WebsocketService):
class InputParams(BaseModel):
language: Optional[Language] = Language.EN
latency: Optional[str] = "normal" # "normal" or "balanced"
Expand Down Expand Up @@ -105,7 +104,9 @@ async def cancel(self, frame: CancelFrame):

async def _connect(self):
await self._connect_websocket()
self._receive_task = self.get_event_loop().create_task(self._receive_task_handler())
self._receive_task = self.get_event_loop().create_task(
self._receive_task_handler(self.push_error)
)

async def _disconnect(self):
await self._disconnect_websocket()
Expand Down Expand Up @@ -169,30 +170,6 @@ async def _receive_messages(self):
except Exception as e:
logger.error(f"Error processing message: {e}")

async def _reconnect_websocket(self, retry_state: RetryCallState):
logger.warning(f"Fish Audio reconnecting (attempt: {retry_state.attempt_number})")
await self._disconnect_websocket()
await self._connect_websocket()

async def _receive_task_handler(self):
while True:
try:
async for attempt in AsyncRetrying(
stop=stop_after_attempt(3),
wait=wait_exponential(multiplier=1, min=4, max=10),
before_sleep=self._reconnect_websocket,
reraise=True,
):
with attempt:
await self._receive_messages()
except asyncio.CancelledError:
break
except Exception as e:
message = f"Fish Audio error receiving messages: {e}"
logger.error(message)
await self.push_error(ErrorFrame(message, fatal=True))
break

async def process_frame(self, frame: Frame, direction: FrameDirection):
await super().process_frame(frame, direction)

Expand Down

0 comments on commit 2abbd4b

Please sign in to comment.