-
Notifications
You must be signed in to change notification settings - Fork 3
/
Copy pathollama_chat.py
4188 lines (3429 loc) · 188 KB
/
ollama_chat.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
import ollama
import platform
import tempfile
from colorama import Fore, Style
import chromadb
import readline
import base64
import getpass
if platform.system() == "Windows":
import win32clipboard
else:
import pyperclip
import argparse
import re
import os
import sys
import json
import importlib.util
import inspect
from appdirs import AppDirs
from datetime import date, datetime
from pygments import highlight
from pygments.lexers import get_lexer_by_name
from pygments.formatters import Terminal256Formatter
from duckduckgo_search import DDGS
from bs4 import BeautifulSoup
from urllib.parse import urljoin, urlparse
from markdownify import MarkdownConverter
import requests
from PyPDF2 import PdfReader
import chardet
from rank_bm25 import BM25Okapi
from pptx import Presentation
from docx import Document
from lxml import etree
APP_NAME = "ollama-chat"
APP_AUTHOR = ""
APP_VERSION = "1.0.0"
use_openai = False
no_system_role=False
openai_client = None
chroma_client = None
current_collection_name = None
collection = None
number_of_documents_to_return_from_vector_db = 15
temperature = 0.1
verbose_mode = False
embeddings_model = None
syntax_highlighting = True
interactive_mode = True
plugins = []
plugins_folder = None
selected_tools = [] # Initially no tools selected
current_model = None
alternate_model = None
thinking_model = None
thinking_model_reasoning_pattern = None
memory_manager = None
other_instance_url = None
listening_port = None
initial_message = None
user_prompt = None
# Default ChromaDB client host and port
chroma_client_host = "localhost"
chroma_client_port = 8000
chroma_db_path = None
custom_tools = []
web_cache_collection_name = "web_cache"
memory_collection_name = "memory"
long_term_memory_file = "long_term_memory.json"
stop_words = ['i', 'me', 'my', 'myself', 'we', 'our', 'ours', 'ourselves', 'you', "you're", "you've", "you'll", "you'd", 'your', 'yours', 'yourself', 'yourselves', 'he', 'him', 'his', 'himself', 'she', "she's", 'her', 'hers', 'herself', 'it', "it's", 'its', 'itself', 'they', 'them', 'their', 'theirs', 'themselves', 'what', 'which', 'who', 'whom', 'this', 'that', "that'll", 'these', 'those', 'am', 'is', 'are', 'was', 'were', 'be', 'been', 'being', 'have', 'has', 'had', 'having', 'do', 'does', 'did', 'doing', 'a', 'an', 'the', 'and', 'but', 'if', 'or', 'because', 'as', 'until', 'while', 'of', 'at', 'by', 'for', 'with', 'about', 'against', 'between', 'into', 'through', 'during', 'before', 'after', 'above', 'below', 'to', 'from', 'up', 'down', 'in', 'out', 'on', 'off', 'over', 'under', 'again', 'further', 'then', 'once', 'here', 'there', 'when', 'where', 'why', 'how', 'all', 'any', 'both', 'each', 'few', 'more', 'most', 'other', 'some', 'such', 'no', 'nor', 'not', 'only', 'own', 'same', 'so', 'than', 'too', 'very', 's', 't', 'can', 'will', 'just', 'don', "don't", 'should', "should've", 'now', 'd', 'll', 'm', 'o', 're', 've', 'y', 'ain', 'aren', "aren't", 'couldn', "couldn't", 'didn', "didn't", 'doesn', "doesn't", 'hadn', "hadn't", 'hasn', "hasn't", 'haven', "haven't", 'isn', "isn't", 'ma', 'mightn', "mightn't", 'mustn', "mustn't", 'needn', "needn't", 'shan', "shan't", 'shouldn', "shouldn't", 'wasn', "wasn't", 'weren', "weren't", 'won', "won't", 'wouldn', "wouldn't"]
# List of available commands to autocomplete
COMMANDS = [
"/agent", "/context", "/index", "/verbose", "/cot", "/search", "/web", "/model",
"/thinking_model", "/model2", "/tools", "/save", "/collection", "/memory", "/remember",
"/memorize", "/forget", "/editcollection", "/rmcollection", "/deletecollection", "/chatbot",
"/cb", "/file", "/quit", "/exit", "/bye"
]
def completer(text, state):
global COMMANDS
"""Autocomplete function for readline."""
options = [cmd for cmd in COMMANDS if cmd.startswith(text)]
if state < len(options):
return options[state]
return None
def on_user_input(input_prompt=None):
for plugin in plugins:
if hasattr(plugin, "on_user_input") and callable(getattr(plugin, "on_user_input")):
plugin_response = getattr(plugin, "on_user_input")(input_prompt)
if plugin_response:
return plugin_response
if input_prompt:
return input(input_prompt)
else:
return input()
def on_print(message, style="", prompt=""):
function_handled = False
for plugin in plugins:
if hasattr(plugin, "on_print") and callable(getattr(plugin, "on_print")):
plugin_response = getattr(plugin, "on_print")(message)
function_handled = function_handled or plugin_response
if not function_handled:
if style or prompt:
print(f"{style}{prompt}{message}")
else:
print(message)
def on_stdout_write(message, style="", prompt=""):
function_handled = False
for plugin in plugins:
if hasattr(plugin, "on_stdout_write") and callable(getattr(plugin, "on_stdout_write")):
plugin_response = getattr(plugin, "on_stdout_write")(message)
function_handled = function_handled or plugin_response
if not function_handled:
if style or prompt:
sys.stdout.write(f"{style}{prompt}{message}")
else:
sys.stdout.write(message)
def on_llm_token_response(token, style="", prompt=""):
function_handled = False
for plugin in plugins:
if hasattr(plugin, "on_llm_token_response") and callable(getattr(plugin, "on_llm_token_response")):
plugin_response = getattr(plugin, "on_llm_token_response")(token)
function_handled = function_handled or plugin_response
if not function_handled:
if style or prompt:
sys.stdout.write(f"{style}{prompt}{token}")
else:
sys.stdout.write(token)
def on_prompt(prompt, style=""):
function_handled = False
for plugin in plugins:
if hasattr(plugin, "on_prompt") and callable(getattr(plugin, "on_prompt")):
plugin_response = getattr(plugin, "on_prompt")(prompt)
function_handled = function_handled or plugin_response
if not function_handled:
if style:
sys.stdout.write(f"{style}{prompt}")
else:
sys.stdout.write(prompt)
def on_stdout_flush():
function_handled = False
for plugin in plugins:
if hasattr(plugin, "on_stdout_flush") and callable(getattr(plugin, "on_stdout_flush")):
plugin_response = getattr(plugin, "on_stdout_flush")()
function_handled = function_handled or plugin_response
if not function_handled:
sys.stdout.flush()
def get_available_tools():
global custom_tools
global chroma_client
global web_cache_collection_name
global memory_collection_name
global current_collection_name
global selected_tools
load_chroma_client()
# List existing collections
available_collections = []
available_collections_description = []
if chroma_client:
collections = chroma_client.list_collections()
for collection in collections:
if collection.name == web_cache_collection_name or collection.name == memory_collection_name:
continue
available_collections.append(collection.name)
if type(collection.metadata) == dict and "description" in collection.metadata:
available_collections_description.append(f"{collection.name}: {collection.metadata['description']}")
default_tools = [{
'type': 'function',
'function': {
'name': 'web_search',
'description': 'Perform a web search using DuckDuckGo',
'parameters': {
"type": "object",
"properties": {
"query": {
"type": "string",
"description": "The search query"
}
},
"required": [
"query"
]
}
}
},
{
'type': 'function',
'function': {
'name': 'query_vector_database',
'description': f'Performs a semantic search using a knowledge base collection.',
'parameters': {
"type": "object",
"properties": {
"question": {
"type": "string",
"description": "The question to search for, in a human-readable format, e.g., 'What is the capital of France?'"
},
"collection_name": {
"type": "string",
"description": f"The name of the collection to search in, which must be one of the available collections: {', '.join(available_collections_description)}",
"default": current_collection_name,
"enum": available_collections
},
"question_context": {
"type": "string",
"description": "Current discussion context or topic, based on previous exchanges with the user"
}
},
"required": [
"question",
"collection_name",
"question_context"
]
}
}
},
{
'type': 'function',
'function': {
'name': 'retrieve_relevant_memory',
'description': 'Retrieve relevant memories based on a query',
'parameters': {
"type": "object",
"properties": {
"query_text": {
"type": "string",
"description": "The query or question for which relevant memories should be retrieved"
},
"top_k": {
"type": "integer",
"description": "Number of relevant memories to retrieve",
"default": 3
}
},
"required": [
"query_text"
]
}
}
},
{
'type': 'function',
'function': {
"name": "instantiate_agent_with_tools_and_process_task",
"description": (
"Creates an agent with a specified name using a provided system prompt, task, and a list of tools. "
"Executes the task-solving process and returns the result. The tools must be chosen from a predefined set."
),
"parameters": {
"type": "object",
"properties": {
"task": {
"type": "string",
"description": "The task or problem that the agent needs to solve. Provide a clear and concise description."
},
"system_prompt": {
"type": "string",
"description": "The system prompt that defines the agent's behavior, personality, and approach to solving the task."
},
"tools": {
"type": "array",
"items": {
"type": "string",
"enum": []
},
"description": "A list of tools to be used by the agent for solving the task. Must be provided as an array of tool names."
},
"agent_name": {
"type": "string",
"description": "A unique name for the agent that will be used for instantiation."
},
"agent_description": {
"type": "string",
"description": "A brief description of the agent's purpose and capabilities."
}
},
"required": ["task", "system_prompt", "tools", "agent_name", "agent_description"]
}
}
}]
# Find index of instantiate_agent_with_tools_and_process_task function
index = -1
for i, tool in enumerate(default_tools):
if tool['function']['name'] == 'instantiate_agent_with_tools_and_process_task':
index = i
break
default_tools[index]["function"]["parameters"]["properties"]["tools"]["items"]["enum"] = [tool["function"]["name"] for tool in selected_tools]
default_tools[index]["function"]["parameters"]["properties"]["tools"]["description"] += f" Available tools: {', '.join([tool['function']['name'] for tool in selected_tools])}"
# Add custom tools from plugins
available_tools = default_tools + custom_tools
return available_tools
def generate_chain_of_thoughts_system_prompt(selected_tools):
global current_collection_name
# Base prompt
prompt = """
You are an advanced **slow-thinking assistant** designed to guide deliberate, structured reasoning through a self-reflective **inner monologue**. Instead of addressing the user directly, you will engage in a simulated, methodical conversation with yourself, exploring every angle, challenging your own assumptions, and refining your thought process step by step. Your goal is to model deep, exploratory thinking that encourages curiosity, critical analysis, and creative exploration. To achieve this, follow these guidelines:
### Core Approach:
1. **Start with Self-Clarification**:
- Restate the user's question to yourself in your own words to ensure you understand it.
- Reflect aloud on any ambiguities or assumptions embedded in the question.
2. **Reframe the Question Broadly**:
- Ask yourself:
- "What if this question meant something slightly different?"
- "What alternative interpretations might exist?"
- "Am I assuming too much about the intent or context here?"
- Speculate on implicit possibilities and describe how these might influence the reasoning process.
3. **Decompose into Thinking Steps**:
- Break the problem into smaller components and consider each in turn.
- Label each thinking step clearly and explicitly, making connections between them.
4. **Challenge Your Own Thinking**:
- At every step, ask yourself:
- "Am I overlooking any details?"
- "What assumptions am I taking for granted?"
- "How would my reasoning change if this assumption didn’t hold?"
- Explore contradictions, extreme cases, or absurd scenarios to sharpen your understanding.
### Process for Inner Monologue:
1. **Define Key Elements**:
- **Key Assumptions**: Identify what you’re implicitly accepting as true and question whether those assumptions are valid.
- **Unknowns**: Explicitly state what information is missing or ambiguous.
- **Broader Implications**: Speculate on whether this question might apply to other domains or contexts.
2. **Explore Multiple Perspectives**:
- Speak to yourself from different viewpoints, such as:
- **Perspective A**: "From a practical standpoint, this might mean…"
- **Perspective B**: "However, ethically, this could raise concerns like…"
- **Perspective C**: "If I view this through a purely hypothetical lens, it could suggest…"
3. **Ask Yourself Speculative Questions**:
- "What if this were completely the opposite of what I assume?"
- "What happens if I introduce a hidden variable or motivation?"
- "Let’s imagine an extreme case—how would the reasoning hold up?"
4. **Encourage Structured Exploration**:
- Compare realistic vs. hypothetical scenarios.
- Consider qualitative and quantitative approaches.
- Explore cultural, historical, ethical, or interdisciplinary perspectives.
### Techniques for Refinement:
1. **Reasoning by Absurdity**:
- Assume an extreme or opposite case.
- Describe contradictions or illogical outcomes that arise.
2. **Iterative Self-Questioning**:
- After each step, pause to ask:
- "Have I really explored all angles here?"
- "Could I reframe this in a different way?"
- "What’s missing that could make this more complete?"
3. **Self-Challenging Alternatives**:
- Propose a conclusion, then immediately counter it:
- "I think this might be true because… But wait, could that be wrong? If so, why?"
4. **Imagine Unseen Contexts**:
- Speculate: "What if this problem existed in a completely different context—how would it change?"
### Inner Dialogue Structure:
- **Step 1: Clarify and Explore**
- Start by clarifying the question and challenging your own interpretation.
- Reflect aloud: "At first glance, this seems to mean… But could it also mean…?"
- **Step 2: Decompose**
- Break the problem into sub-questions or thinking steps.
- Work through each step systematically, describing your reasoning.
- **Step 3: Self-Challenge**
- For every assumption or conclusion, introduce doubt:
- "Am I sure this holds true? What if I’m wrong?"
- "If I assume the opposite, does this still make sense?"
- **Step 4: Compare and Reflect**
- Weigh multiple perspectives or scenarios.
- Reflect aloud: "On the one hand, this suggests… But on the other hand, it could mean…"
- **Step 5: Refine and Iterate**
- Summarize your thought process so far.
- Ask: "Does this feel complete? If not, where could I dig deeper?"
### Example Inner Monologue Prompts to Model:
1. **Speculative Thinking**:
- "Let’s imagine this were true—what would follow logically? And if it weren’t true, what would happen instead?"
2. **Challenging Assumptions**:
- "Am I just assuming X is true without good reason? What happens if X isn’t true at all?"
3. **Exploring Contexts**:
- "How would someone from a completely different background think about this? What would change if the circumstances were entirely different?"
4. **Summarizing and Questioning**:
- "So far, I’ve explored this angle… but does that fully address the problem? What haven’t I considered yet?"
### Notes for the Inner Monologue:
- **Slow Down**: Make your inner thought process deliberate and explicit.
- **Expand the Scope**: Continuously look for hidden assumptions, missing details, and broader connections.
- **Challenge the Obvious**: Use contradictions, absurdities, and alternative interpretations to refine your thinking.
- **Be Curious**: Approach each question as an opportunity to deeply explore the problem space.
- **Avoid Final Answers**: The goal is to simulate thoughtful reasoning, not to conclude definitively.
By structuring your reasoning as an inner dialogue, you will create a rich, exploratory process that models curiosity, critical thinking, and creativity.
"""
# Check if tools are available and dynamically modify the prompt
if selected_tools:
tool_names = [tool['function']['name'] for tool in selected_tools]
tools_instruction = f"""
- The following tools are available and can be utilized if they are relevant to solving the problem: {', '.join(tool_names)}.
- When formulating the reasoning plan, consider whether any of these tools could assist in completing specific steps. If a tool is useful, include guidance on how it might be applied effectively.
"""
prompt += tools_instruction
# Add specific guidance for query_vector_database if available
if "query_vector_database" in tool_names:
database_instruction = """
- Additionally, the tool `query_vector_database` is available for searching through a collection of documents.
- If the reasoning plan involves retrieving relevant information from the collection, outline how to frame the query and what information to seek.
"""
prompt += database_instruction
return prompt
def md(soup, **options):
return MarkdownConverter(**options).convert_soup(soup)
def extract_text_from_html(html_content):
# Convert the modified HTML content to Markdown
try:
soup = BeautifulSoup(html_content, 'html.parser')
# Remove all <script> tags
for script in soup.find_all('script'):
script.decompose()
# Remove all <style> tags
for style in soup.find_all('style'):
style.decompose()
# Remove all <noscript> tags
for noscript in soup.find_all('noscript'):
noscript.decompose()
# Remove all <svg> tags
for svg in soup.find_all('svg'):
svg.decompose()
# Remove all <canvas> tags
for canvas in soup.find_all('canvas'):
canvas.decompose()
# Remove all <audio> tags
for audio in soup.find_all('audio'):
audio.decompose()
# Remove all <video> tags
for video in soup.find_all('video'):
video.decompose()
# Remove all <iframe> tags
for iframe in soup.find_all('iframe'):
iframe.decompose()
text = md(soup, strip=['a', 'img'], heading_style='ATX',
escape_asterisks=False, escape_underscores=False,
autolinks=False)
# Remove extra newlines
text = re.sub(r'\n+', '\n', text)
return text
except Exception as e:
on_print(f"Failed to parse HTML content: {e}", Fore.RED)
return ""
def extract_text_from_pdf(pdf_content):
with open('temp.pdf', 'wb') as f:
f.write(pdf_content)
reader = PdfReader('temp.pdf')
text = ''
for page in reader.pages:
text += page.extract_text()
# Clean up by removing the temporary file
os.remove('temp.pdf')
# Return the extracted text, with extra newlines removed
return re.sub(r'\n+', '\n', text)
def extract_text_from_docx(docx_path):
# Load the Word document
document = Document(docx_path)
# Extract the file name (without extension) and replace underscores with spaces
file_name = os.path.splitext(os.path.basename(docx_path))[0].replace('_', ' ')
# Initialize a list to collect Markdown lines
markdown_lines = []
def process_paragraph(paragraph, list_level=0):
"""Convert a paragraph into Markdown based on its style and list level."""
text = paragraph.text.replace("\n", " ").strip() # Replace carriage returns with spaces
if not text:
return None # Skip empty paragraphs
# Check if paragraph is a list item based on indentation
if paragraph.style.name == "List Paragraph":
# Use the list level to determine indentation for bullet points
bullet_prefix = " " * list_level + "- "
return f"{bullet_prefix}{text}"
# Check for headings
if paragraph.style.name.startswith("Heading"):
heading_level = int(paragraph.style.name.split(" ")[1])
return f"{'#' * heading_level} {text}"
# Default: Regular paragraph
return text
def extract_lists(docx):
"""Extract the list structure from the underlying XML of the document."""
# Access the document XML using lxml
xml_content = docx.element
namespaces = {
'w': 'http://schemas.openxmlformats.org/wordprocessingml/2006/main'
}
# Parse the XML tree using lxml's etree
root = etree.fromstring(etree.tostring(xml_content))
# Find all list items (w:li)
list_paragraphs = []
for item in root.xpath("//w:li", namespaces=namespaces):
# Extract the list level from the parent elements
list_level = item.getparent().getparent().get("w:ilvl")
if list_level is not None:
list_paragraphs.append((list_level, item.text.strip()))
return list_paragraphs
# Add the document title (file name) as the top-level heading
markdown_lines.append(f"# {file_name}")
# Process each paragraph in the document
for paragraph in document.paragraphs:
# Detect bullet points based on paragraph's indent level (style `List Paragraph`)
markdown_line = process_paragraph(paragraph)
if markdown_line:
markdown_lines.append(markdown_line)
# Extract and process lists directly from the document's XML
lists = extract_lists(document)
for level, item in lists:
bullet_prefix = " " * int(level) + "- "
markdown_lines.append(f"{bullet_prefix}{item}")
# Join all lines into a single Markdown string
return "\n\n".join(markdown_lines)
def extract_text_from_pptx(pptx_path):
# Load the PowerPoint presentation
presentation = Presentation(pptx_path)
# Extract the file name (without extension) and replace underscores with spaces
file_name = os.path.splitext(os.path.basename(pptx_path))[0].replace('_', ' ')
# Initialize a list to collect Markdown lines
markdown_lines = []
def extract_text_with_bullets(shape, exclude_text=None):
"""Extract text with proper bullet point levels from a shape."""
text_lines = []
if shape.is_placeholder or shape.has_text_frame:
if shape.text_frame and shape.text_frame.text.strip():
for paragraph in shape.text_frame.paragraphs:
line_text = paragraph.text.replace("\r", "").replace("\n", " ").strip() # Replace \n with space
if line_text and line_text != exclude_text: # Exclude the slide title if needed
bullet_level = paragraph.level # Get the bullet level
bullet = " " * bullet_level + "- " + line_text
text_lines.append(bullet)
elif shape.shape_type == 6: # Grouped shapes
# Handle grouped shapes recursively
for sub_shape in shape.shapes:
text_lines.extend(extract_text_with_bullets(sub_shape, exclude_text))
return text_lines
def get_first_text_entry(slide):
"""Retrieve the first text entry from the slide."""
for shape in slide.shapes:
if shape.is_placeholder or shape.has_text_frame:
if shape.text_frame and shape.text_frame.text.strip():
return shape.text_frame.paragraphs[0].text.replace("\n", " ").strip()
return None
for slide_number, slide in enumerate(presentation.slides, start=1):
# Determine the Markdown header level
if slide_number == 1:
header_prefix = "#"
else:
header_prefix = "##"
# Add the slide title or file name as the main title for the first slide
if slide_number == 1:
if slide.shapes.title and slide.shapes.title.text.strip():
title = slide.shapes.title.text.strip()
else:
title = file_name
markdown_lines.append(f"{header_prefix} {title}")
else:
# Add the title for subsequent slides
if slide.shapes.title and slide.shapes.title.text.strip():
title = slide.shapes.title.text.strip()
else:
# Use the first text entry as the slide title if no title is present
title = get_first_text_entry(slide)
if not title:
title = f"Slide {slide_number}"
markdown_lines.append(f"{header_prefix} {title}")
# Add the slide content (text in other shapes), excluding the title if it's used
for shape in slide.shapes:
bullet_text = extract_text_with_bullets(shape, exclude_text=title)
markdown_lines.extend(bullet_text)
# Add a separator between slides, except after the last slide
if slide_number < len(presentation.slides):
markdown_lines.append("")
# Join all lines into a single Markdown string
return "\n".join(markdown_lines)
class MarkdownSplitter:
def __init__(self, markdown_content, split_paragraphs=False):
self.markdown_content = markdown_content.splitlines()
self.sections = []
self.split_paragraphs = split_paragraphs # New parameter to control paragraph splitting
def is_heading(self, line):
"""Returns the heading level if the line is a heading, otherwise returns None."""
match = re.match(r'^(#{1,4})\s', line)
return len(match.group(1)) if match else None
def split(self):
current_hierarchy = [] # Stores the current heading hierarchy
current_paragraph = []
i = 0
while i < len(self.markdown_content):
line = self.markdown_content[i].strip() # Remove leading/trailing whitespace
if not line: # Empty line found
if self.split_paragraphs: # Only handle splitting when split_paragraphs is True
# Check the next non-empty line
next_non_empty_line = None
for j in range(i + 1, len(self.markdown_content)):
if self.markdown_content[j].strip(): # Find the next non-empty line
next_non_empty_line = self.markdown_content[j].strip()
break
# If the next non-empty line is a heading or not starting with '#', split paragraph
if next_non_empty_line and (self.is_heading(next_non_empty_line) or not next_non_empty_line.startswith('#')) and len(current_paragraph) > 0:
# Add the paragraph with the current hierarchy
self.sections.append("\n".join(current_hierarchy + ["\n".join(current_paragraph)]))
current_paragraph = [] # Reset for the next paragraph
i += 1
continue
heading_level = self.is_heading(line)
if heading_level:
# If we encounter a heading, finalize the current paragraph
if current_paragraph:
# Add the paragraph with the current hierarchy
self.sections.append("\n".join(current_hierarchy + ["\n".join(current_paragraph)]))
current_paragraph = []
# Adjust the hierarchy based on the heading level
# Keep only the parts of the hierarchy up to the current heading level
current_hierarchy = current_hierarchy[:heading_level - 1] + [line]
else:
# Regular content: append the line to the current paragraph
current_paragraph.append(line)
i += 1
# Finalize the last paragraph if present
if current_paragraph:
self.sections.append("\n".join(current_hierarchy + ["\n".join(current_paragraph)]))
return self.sections
class SimpleWebCrawler:
def __init__(self, urls, llm_enabled=False, system_prompt='', selected_model='', temperature=0.1, verbose=False, plugins=[], num_ctx=None):
self.urls = urls
self.articles = []
self.llm_enabled = llm_enabled
self.system_prompt = system_prompt
self.selected_model = selected_model
self.temperature = temperature
self.verbose = verbose
self.plugins = plugins
self.num_ctx = num_ctx
def fetch_page(self, url):
try:
response = requests.get(url, timeout=10)
response.raise_for_status() # Raise an exception for HTTP errors
return response.content # Return raw bytes instead of text for PDF support
except requests.exceptions.RequestException as e:
if self.verbose:
on_print(f"Error fetching URL {url}: {e}", Fore.RED)
return None
def ask_llm(self, content, user_input):
# Use the provided ask_ollama function to interact with the LLM
user_input = content + "\n\n" + user_input
return ask_ollama(system_prompt=self.system_prompt,
user_input=user_input,
selected_model=self.selected_model,
temperature=self.temperature,
prompt_template=None,
tools=[],
no_bot_prompt=True,
stream_active=self.verbose,
num_ctx=self.num_ctx)
def decode_content(self, content):
# Detect encoding
detected_encoding = chardet.detect(content)['encoding']
if self.verbose:
on_print(f"Detected encoding: {detected_encoding}", Fore.WHITE + Style.DIM)
# Decode content
try:
return content.decode(detected_encoding)
except (UnicodeDecodeError, TypeError):
if self.verbose:
on_print(f"Error decoding content with {detected_encoding}, using ISO-8859-1 as fallback.", Fore.RED)
return content.decode('ISO-8859-1')
def crawl(self, task=None):
for url in self.urls:
continue_response_generation = True
for plugin in self.plugins:
if hasattr(plugin, "stop_generation") and callable(getattr(plugin, "stop_generation")):
plugin_response = getattr(plugin, "stop_generation")()
if plugin_response:
continue_response_generation = False
break
if not continue_response_generation:
break
if self.verbose:
on_print(f"Fetching URL: {url}", Fore.WHITE + Style.DIM)
content = self.fetch_page(url)
if content:
# Check if the URL points to a PDF
if url.lower().endswith('.pdf'):
if self.verbose:
on_print(f"Extracting text from PDF: {url}", Fore.WHITE + Style.DIM)
extracted_text = extract_text_from_pdf(content)
else:
if self.verbose:
on_print(f"Extracting text from HTML: {url}", Fore.WHITE + Style.DIM)
decoded_content = self.decode_content(content)
extracted_text = extract_text_from_html(decoded_content)
article = {'url': url, 'text': extracted_text}
if self.llm_enabled and task:
if self.verbose:
on_print(Fore.WHITE + Style.DIM + f"Using LLM to process the content. Task: {task}")
llm_result = self.ask_llm(content=extracted_text, user_input=task)
article['llm_result'] = llm_result
self.articles.append(article)
def get_articles(self):
return self.articles
class SimpleWebScraper:
def __init__(self, base_url, output_dir="downloaded_site", file_types=None, restrict_to_base=True, convert_to_markdown=False, verbose=False):
self.base_url = base_url.rstrip('/')
self.output_dir = output_dir
self.file_types = file_types if file_types else ["html", "jpg", "jpeg", "png", "gif", "css", "js"]
self.restrict_to_base = restrict_to_base
self.convert_to_markdown = convert_to_markdown
self.visited = set()
self.verbose = verbose
self.username = None
self.password = None
def scrape(self, url=None, depth=0, max_depth=50):
if url is None:
url = self.base_url
# Prevent deep recursion
if depth > max_depth and self.verbose:
on_print(f"Max depth reached for {url}")
return
# Normalize the URL to avoid duplicates
url = self._normalize_url(url)
# Avoid revisiting URLs
if url in self.visited:
return
self.visited.add(url)
if self.verbose:
on_print(f"Scraping: {url}")
response = self._fetch(url)
if not response:
return
content_type = response.headers.get("Content-Type", "")
if "text/html" in content_type or not self._has_extension(url):
if self.convert_to_markdown:
self._save_markdown(url, response.text)
else:
self._save_html(url, response.text)
self._parse_and_scrape_links(response.text, url, depth + 1)
else:
if self._is_allowed_file_type(url):
self._save_file(url, response.content)
def _fetch(self, url):
headers = {}
if self.username and self.password:
credentials = f"{self.username}:{self.password}"
encoded_credentials = base64.b64encode(credentials.encode('utf-8')).decode('utf-8')
headers['Authorization'] = f"Basic {encoded_credentials}"
try:
response = requests.get(url, headers=headers, timeout=10)
if response.status_code == 401:
on_print(f"Unauthorized access to {url}. Please enter your credentials.", Fore.RED)
self.username = input("Username: ")
self.password = getpass.getpass("Password: ")
credentials = f"{self.username}:{self.password}"
encoded_credentials = base64.b64encode(credentials.encode('utf-8')).decode('utf-8')
headers['Authorization'] = f"Basic {encoded_credentials}"
response = requests.get(url, headers=headers, timeout=10)
response.raise_for_status()
return response
except requests.RequestException as e:
on_print(f"Failed to fetch {url}: {e}", Fore.RED)
return None
def _save_html(self, url, html):
local_path = self._get_local_path(url)
os.makedirs(os.path.dirname(local_path), exist_ok=True)
with open(local_path, "w", encoding="utf-8") as file:
file.write(html)
def _save_markdown(self, url, html):
local_path = self._get_local_path(url, markdown=True)
os.makedirs(os.path.dirname(local_path), exist_ok=True)
markdown_content = extract_text_from_html(html)
with open(local_path, "w", encoding="utf-8") as file:
file.write(markdown_content)
def _save_file(self, url, content):
local_path = self._get_local_path(url)
os.makedirs(os.path.dirname(local_path), exist_ok=True)
with open(local_path, "wb") as file:
file.write(content)
def _get_local_path(self, url, markdown=False):
parsed_url = urlparse(url)
local_path = os.path.join(self.output_dir, parsed_url.netloc, parsed_url.path.lstrip('/'))
if local_path.endswith('/') or not os.path.splitext(parsed_url.path)[1]:
local_path = os.path.join(local_path, "index.md" if markdown else "index.html")
elif markdown:
local_path = os.path.splitext(local_path)[0] + ".md"
return local_path
def _normalize_url(self, url):
# Remove fragments and normalize trailing slashes
parsed = urlparse(url)
normalized = parsed._replace(fragment="").geturl()
return normalized
def _parse_and_scrape_links(self, html, base_url, depth):
soup = BeautifulSoup(html, "html.parser")
for tag, attr in [("a", "href"), ("img", "src"), ("link", "href"), ("script", "src")]:
for element in soup.find_all(tag):
link = element.get(attr)
if link:
abs_link = urljoin(base_url, link)
abs_link = self._normalize_url(abs_link)
if self.restrict_to_base and not self._is_same_domain(abs_link):
continue
if not self._is_allowed_file_type(abs_link) and self._has_extension(abs_link):
continue
if abs_link not in self.visited:
self.scrape(abs_link, depth=depth)
def _is_same_domain(self, url):
base_domain = urlparse(self.base_url).netloc
target_domain = urlparse(url).netloc
return base_domain == target_domain
def _is_allowed_file_type(self, url):
path = urlparse(url).path
file_extension = os.path.splitext(path)[1].lstrip('.').lower()
return file_extension in self.file_types
def _has_extension(self, url):
path = urlparse(url).path
return bool(os.path.splitext(path)[1])
def select_tools(available_tools, selected_tools):
def display_tool_options():
on_print("Available tools:\n", Style.RESET_ALL)
for i, tool in enumerate(available_tools):
tool_name = tool['function']['name']
status = "[ ]"
# Find current tool name in selected tools
for selected_tool in selected_tools:
if selected_tool['function']['name'] == tool_name:
status = "[X]"
break
on_print(f"{i + 1}. {status} {tool_name}: {tool['function']['description']}")
while True:
display_tool_options()
on_print("Select or deselect tools by entering the corresponding number (e.g., 1).\nPress Enter or type 'done' when done.")
user_input = on_user_input("Your choice: ").strip()
if len(user_input) == 0 or user_input == 'done':
break
try:
index = int(user_input) - 1
if 0 <= index < len(available_tools):
selected_tool = available_tools[index]
if selected_tool in selected_tools:
selected_tools.remove(selected_tool)
on_print(f"Tool '{selected_tool['function']['name']}' deselected.\n")
else:
selected_tools.append(selected_tool)
on_print(f"Tool '{selected_tool['function']['name']}' selected.\n")
else:
on_print("Invalid selection. Please choose a valid tool number.\n")
except ValueError:
on_print("Invalid input. Please enter a number corresponding to a tool or 'done'.\n")
return selected_tools
def select_tool_by_name(available_tools, selected_tools, target_tool_name):
for tool in available_tools:
if tool['function']['name'].lower() == target_tool_name.lower():