-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathmain.py
221 lines (175 loc) · 6.16 KB
/
main.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
from sounds import play_sound
from button import wait_for_button, start_monitoring
play_sound("low_sound")
wait_for_button()
import asyncio
import google.generativeai as genai
from time import sleep
from PIL import Image
from search import search
from voice import record_and_transcribe
from webcam import cam, take_pic
from tts import tts
import azurespeech
from azurespeech import text_to_speech
from prompts import prompts
from ai import model, conv, Message, Role
import button
import threading
from google.api_core.exceptions import InternalServerError
from superfastconvo import superfastconvo_record_and_transcribe
from groq import Stream
from google.generativeai.types import GenerateContentResponse
from setup_logging import logger
import signal
# Custom exception to indicate interruption from a thread
class ThreadKeyboardInterrupt(Exception):
pass
def signal_handler(signum, frame):
raise ThreadKeyboardInterrupt
signal.signal(signal.SIGINT, signal_handler)
async def shutdown(loop: asyncio.AbstractEventLoop):
"""
Function to shutdown the async tasks and the event loop.
Args:
loop (asyncio.AbstractEventLoop): The loop to shutdown.
"""
logger.debug("Shutting down async tasks...")
tasks = [t for t in asyncio.all_tasks() if t is not asyncio.current_task()]
[task.cancel() for task in tasks]
await asyncio.gather(*tasks, return_exceptions=True)
loop.stop()
async def main(question: str, pic: Image.Image = None):
"""
The main function which is run when JARVIS is activated.
Args:
question (str): The question from the user.
pic (Image.Image, optional): The photo from the camera. Defaults to None.
"""
logger.info("Main function running")
global model
model.use_vision()
pic_relevance = await model.prompt("pic_relevance", pic, question=question)
pic_relevance = pic_relevance.lower()
print(f"{pic_relevance=}")
pic = pic if "no" not in pic_relevance else None
model.choose_model_from(pic)
logger.debug(f"Checking to search for '{question}'")
to_search = await model.prompt("to_search", pic, question=question)
final_message = ""
params: dict[str, str] = {"question": question}
if "yes" in to_search.lower():
logger.debug("No search")
final_message = "[final_prompts][no_search]"
elif "no" in to_search.lower():
logger.debug("Search")
formatted_search_contents, search_contents = await search(question, model, pic)
if search_contents is None:
final_message = "[final_prompts][no_search]"
else:
params["formatted_search_contents"] = formatted_search_contents
final_message = "[final_prompts][yes_search]"
model.choose_model_from(pic)
prompt = model.get_prompt(final_message, **params)
if pic is None:
stream: Stream = model.model.chat.completions.create(
messages=[Message(Role.USER, prompt).json(model.model)],
model=model.groq_model,
stream=True,
)
else:
prompt = [pic, prompt]
try:
stream: GenerateContentResponse = model.model.generate_content(
prompt, stream=True
)
except InternalServerError as e:
print("ERROR", e)
print("Trying again...")
try:
stream: GenerateContentResponse = model.model.generate_content(
prompt, stream=True
)
except InternalServerError as e:
print("ERROR", e)
print("Trying again...")
stream: GenerateContentResponse = model.model.generate_content(
prompt, stream=True
)
response: str = ""
full_response: str = ""
thread = None
print("AI: ", end="")
for chunk in stream:
if pic is None:
c: str = chunk.choices[0].delta.content
else:
c: str = chunk.text
if c:
response += c
try:
print(c, end="")
except UnicodeEncodeError:
print("UnicodeEncodeError")
if thread is None:
base_endings = [".", "?!", "!?", "!", "?", ":"]
complete_endings = []
complete_endings.extend([f"{x} " for x in base_endings])
complete_endings.extend([f"{x}\n" for x in base_endings])
complete_endings.extend(base_endings)
for x in complete_endings:
if not x in response:
continue
response1, response = response.rsplit(x, 1)
response1 += x
full_response = response1
response1: str = response1.strip()
thread = threading.Thread(target=text_to_speech, args=(response1,))
thread.start()
if thread is not None:
thread.join()
full_response += response
response = response.strip()
azurespeech.ds = 0
button.running = True
text_to_speech(response)
counter = 0
while counter < 10:
if azurespeech.ds == 0:
counter += 1
else:
counter = 0
sleep(0.1)
azurespeech.ds = 0
if not full_response.strip():
# TODO: find out why sometimes the response is empty
print(stream.response)
print()
button.running = False
logger.debug("Main function finished")
async def test(question: str, pic: Image.Image = None) -> None:
"""
Testing function.
Args:
question (str): The question from the user.
pic (Image.Image, optional): The photo from the camera. Defaults to None.
"""
logger.debug("Testing...")
...
loop = asyncio.new_event_loop()
start_monitoring()
try:
record_and_transcribe(
main, loop=loop, superfastconvojarvis=superfastconvo_record_and_transcribe
)
except (KeyboardInterrupt, ThreadKeyboardInterrupt):
logger.debug("Ctrl+C detected, exiting...")
print("Exiting...")
# closing the program
cam.close()
loop.run_until_complete(shutdown(loop))
play_sound("low_sound")
sleep(0.5)
play_sound("low_sound")
sleep(0.75)
logger.info("Exited")