-
Notifications
You must be signed in to change notification settings - Fork 133
/
assistant.py
139 lines (108 loc) · 4.67 KB
/
assistant.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
import asyncio
from typing import Annotated
from livekit import agents, rtc
from livekit.agents import JobContext, WorkerOptions, cli, tokenize, tts
from livekit.agents.llm import (
ChatContext,
ChatImage,
ChatMessage,
)
from livekit.agents.voice_assistant import VoiceAssistant
from livekit.plugins import deepgram, openai, silero
class AssistantFunction(agents.llm.FunctionContext):
"""This class is used to define functions that will be called by the assistant."""
@agents.llm.ai_callable(
description=(
"Called when asked to evaluate something that would require vision capabilities,"
"for example, an image, video, or the webcam feed."
)
)
async def image(
self,
user_msg: Annotated[
str,
agents.llm.TypeInfo(
description="The user message that triggered this function"
),
],
):
print(f"Message triggering vision capabilities: {user_msg}")
return None
async def get_video_track(room: rtc.Room):
"""Get the first video track from the room. We'll use this track to process images."""
video_track = asyncio.Future[rtc.RemoteVideoTrack]()
for _, participant in room.remote_participants.items():
for _, track_publication in participant.track_publications.items():
if track_publication.track is not None and isinstance(
track_publication.track, rtc.RemoteVideoTrack
):
video_track.set_result(track_publication.track)
print(f"Using video track {track_publication.track.sid}")
break
return await video_track
async def entrypoint(ctx: JobContext):
await ctx.connect()
print(f"Room name: {ctx.room.name}")
chat_context = ChatContext(
messages=[
ChatMessage(
role="system",
content=(
"Your name is Alloy. You are a funny, witty bot. Your interface with users will be voice and vision."
"Respond with short and concise answers. Avoid using unpronouncable punctuation or emojis."
),
)
]
)
gpt = openai.LLM(model="gpt-4o")
# Since OpenAI does not support streaming TTS, we'll use it with a StreamAdapter
# to make it compatible with the VoiceAssistant
openai_tts = tts.StreamAdapter(
tts=openai.TTS(voice="alloy"),
sentence_tokenizer=tokenize.basic.SentenceTokenizer(),
)
latest_image: rtc.VideoFrame | None = None
assistant = VoiceAssistant(
vad=silero.VAD.load(), # We'll use Silero's Voice Activity Detector (VAD)
stt=deepgram.STT(), # We'll use Deepgram's Speech To Text (STT)
llm=gpt,
tts=openai_tts, # We'll use OpenAI's Text To Speech (TTS)
fnc_ctx=AssistantFunction(),
chat_ctx=chat_context,
)
chat = rtc.ChatManager(ctx.room)
async def _answer(text: str, use_image: bool = False):
"""
Answer the user's message with the given text and optionally the latest
image captured from the video track.
"""
content: list[str | ChatImage] = [text]
if use_image and latest_image:
content.append(ChatImage(image=latest_image))
chat_context.messages.append(ChatMessage(role="user", content=content))
stream = gpt.chat(chat_ctx=chat_context)
await assistant.say(stream, allow_interruptions=True)
@chat.on("message_received")
def on_message_received(msg: rtc.ChatMessage):
"""This event triggers whenever we get a new message from the user."""
if msg.message:
asyncio.create_task(_answer(msg.message, use_image=False))
@assistant.on("function_calls_finished")
def on_function_calls_finished(called_functions: list[agents.llm.CalledFunction]):
"""This event triggers when an assistant's function call completes."""
if len(called_functions) == 0:
return
user_msg = called_functions[0].call_info.arguments.get("user_msg")
if user_msg:
asyncio.create_task(_answer(user_msg, use_image=True))
assistant.start(ctx.room)
await asyncio.sleep(1)
await assistant.say("Hi there! How can I help?", allow_interruptions=True)
while ctx.room.connection_state == rtc.ConnectionState.CONN_CONNECTED:
video_track = await get_video_track(ctx.room)
async for event in rtc.VideoStream(video_track):
# We'll continually grab the latest image from the video track
# and store it in a variable.
latest_image = event.frame
if __name__ == "__main__":
cli.run_app(WorkerOptions(entrypoint_fnc=entrypoint))