diff --git a/.gitignore b/.gitignore index cd5e94f72..f3671b64f 100644 --- a/.gitignore +++ b/.gitignore @@ -3,7 +3,6 @@ **/__pycache__/ - # Ignore all contents of the virtual environment directory .venv/* diff --git a/agent.py b/agent.py index b279a639f..b1e76d739 100644 --- a/agent.py +++ b/agent.py @@ -1,6 +1,8 @@ +import asyncio from dataclasses import dataclass, field import time, importlib, inspect, os, json from typing import Any, Optional, Dict +import uuid from python.helpers import extract_tools, rate_limiter, files, errors from python.helpers.print_style import PrintStyle from langchain.schema import AIMessage @@ -9,11 +11,72 @@ from langchain_core.language_models.chat_models import BaseChatModel from langchain_core.language_models.llms import BaseLLM from langchain_core.embeddings import Embeddings -from concurrent.futures import Future -from python.helpers.log import Log +import python.helpers.log as Log from python.helpers.dirty_json import DirtyJson +from python.helpers.defer import DeferredTask +class AgentContext: + _contexts: dict[str, 'AgentContext'] = {} + _counter: int = 0 + + def __init__(self, config: 'AgentConfig', id:str|None = None, agent0: 'Agent|None' = None): + # build context + self.id = id or str(uuid.uuid4()) + self.config = config + self.log = Log.Log() + self.agent0 = agent0 or Agent(0, self.config, self) + self.paused = False + self.streaming_agent: Agent|None = None + self.process: DeferredTask|None = None + AgentContext._counter += 1 + self.no = AgentContext._counter + + self._contexts[self.id] = self + + @staticmethod + def get(id:str): + return AgentContext._contexts.get(id, None) + + @staticmethod + def first(): + if not AgentContext._contexts: return None + return list(AgentContext._contexts.values())[0] + + + @staticmethod + def remove(id:str): + context = AgentContext._contexts.pop(id, None) + if context and context.process: context.process.kill() + return context + + def reset(self): + if self.process: self.process.kill() + self.log.reset() + self.agent0 = Agent(0, self.config, self) + self.streaming_agent = None + self.paused = False + + + def communicate(self, msg: str, broadcast_level: int = 1): + self.paused=False #unpause if paused + + if self.process and self.process.is_alive(): + if self.streaming_agent: current_agent = self.streaming_agent + else: current_agent = self.agent0 + + # set intervention messages to agent(s): + intervention_agent = current_agent + while intervention_agent and broadcast_level !=0: + intervention_agent.intervention_message = msg + broadcast_level -= 1 + intervention_agent = intervention_agent.data.get("superior",None) + else: + self.process = DeferredTask(self.agent0.message_loop, msg) + + return self.process + + @dataclass class AgentConfig: chat_model: BaseChatModel | BaseLLM @@ -44,18 +107,25 @@ class AgentConfig: code_exec_ssh_user: str = "root" code_exec_ssh_pass: str = "toor" additional: Dict[str, Any] = field(default_factory=dict) - -class Agent: +# intervention exception class - skips rest of message loop iteration +class InterventionException(Exception): + pass + +# killer exception class - not forwarded to LLM, cannot be fixed on its own, ends message loop +class KillerException(Exception): + pass - paused=False - streaming_agent=None +class Agent: - def __init__(self, number:int, config: AgentConfig): + def __init__(self, number:int, config: AgentConfig, context: AgentContext|None = None): # agent config self.config = config + # agent context + self.context = context or AgentContext(config) + # non-config vars self.number = number self.agent_name = f"Agent {self.number}" @@ -63,44 +133,24 @@ def __init__(self, number:int, config: AgentConfig): self.history = [] self.last_message = "" self.intervention_message = "" - self.intervention_status = False - self.rate_limiter = rate_limiter.RateLimiter(max_calls=self.config.rate_limit_requests,max_input_tokens=self.config.rate_limit_input_tokens,max_output_tokens=self.config.rate_limit_output_tokens,window_seconds=self.config.rate_limit_seconds) + self.rate_limiter = rate_limiter.RateLimiter(self.context.log,max_calls=self.config.rate_limit_requests,max_input_tokens=self.config.rate_limit_input_tokens,max_output_tokens=self.config.rate_limit_output_tokens,window_seconds=self.config.rate_limit_seconds) self.data = {} # free data object all the tools can use - self.future: Future|None = None - - - os.chdir(files.get_abs_path("./work_dir")) #change CWD to work_dir - def communicate(self, msg: str): - Agent.paused=False #unpause if paused - - if not self.future or self.future.done(): - return self.message_loop(msg) - else: - if Agent.streaming_agent: current_agent = Agent.streaming_agent - else: current_agent = self - - current_agent.intervention_message = msg #intervene current agent - if self.future: return self.future.result() #wait for original agent - else: return "" - - def message_loop(self, msg: str): + async def message_loop(self, msg: str): try: - self.future = Future() printer = PrintStyle(italic=True, font_color="#b3ffd9", padding=False) user_message = self.read_prompt("fw.user_message.md", message=msg) - self.append_message(user_message, human=True) # Append the user's input to the history - memories = self.fetch_memories(True) + await self.append_message(user_message, human=True) # Append the user's input to the history + memories = await self.fetch_memories(True) while True: # let the agent iterate on his thoughts until he stops by using a tool - Agent.streaming_agent = self #mark self as current streamer + self.context.streaming_agent = self #mark self as current streamer agent_response = "" - self.intervention_status = False # reset interventon status try: system = self.read_prompt("agent.system.md", agent_name=self.agent_name) + "\n\n" + self.read_prompt("agent.tools.md") - memories = self.fetch_memories() + memories = await self.fetch_memories() if memories: system+= "\n\n"+memories prompt = ChatPromptTemplate.from_messages([ @@ -116,10 +166,10 @@ def message_loop(self, msg: str): # output that the agent is starting PrintStyle(bold=True, font_color="green", padding=True, background_color="white").print(f"{self.agent_name}: Generating:") - log = Log(type="agent", heading=f"{self.agent_name}: Generating:") + log = self.context.log.log(type="agent", heading=f"{self.agent_name}: Generating:") - for chunk in chain.stream(inputs): - if self.handle_intervention(agent_response): break # wait for intervention and handle it, if paused + async for chunk in chain.astream(inputs): + await self.handle_intervention(agent_response) # wait for intervention and handle it, if paused if isinstance(chunk, str): content = chunk elif hasattr(chunk, "content"): content = str(chunk.content) @@ -130,34 +180,41 @@ def message_loop(self, msg: str): agent_response += content # concatenate stream into the response self.log_from_stream(agent_response, log) - self.rate_limiter.set_output_tokens(int(len(agent_response)/4)) + self.rate_limiter.set_output_tokens(int(len(agent_response)/4)) # rough estimation - if not self.handle_intervention(agent_response): - if self.last_message == agent_response: #if assistant_response is the same as last message in history, let him know - self.append_message(agent_response) # Append the assistant's response to the history - warning_msg = self.read_prompt("fw.msg_repeat.md") - self.append_message(warning_msg, human=True) # Append warning message to the history - PrintStyle(font_color="orange", padding=True).print(warning_msg) - Log.log(type="warning", content=warning_msg) - - else: #otherwise proceed with tool - self.append_message(agent_response) # Append the assistant's response to the history - tools_result = self.process_tools(agent_response) # process tools requested in agent message - if tools_result: #final response of message loop available - self.future.set_result(tools_result) #set result to future - return tools_result #break the execution if the task is done - - # Forward errors to the LLM, maybe it can fix them - except Exception as e: + await self.handle_intervention(agent_response) + + if self.last_message == agent_response: #if assistant_response is the same as last message in history, let him know + await self.append_message(agent_response) # Append the assistant's response to the history + warning_msg = self.read_prompt("fw.msg_repeat.md") + await self.append_message(warning_msg, human=True) # Append warning message to the history + PrintStyle(font_color="orange", padding=True).print(warning_msg) + self.context.log.log(type="warning", content=warning_msg) + + else: #otherwise proceed with tool + await self.append_message(agent_response) # Append the assistant's response to the history + tools_result = await self.process_tools(agent_response) # process tools requested in agent message + if tools_result: #final response of message loop available + return tools_result #break the execution if the task is done + + except InterventionException as e: + pass # intervention message has been handled in handle_intervention(), proceed with conversation loop + except asyncio.CancelledError as e: + PrintStyle(font_color="white", background_color="red", padding=True).print(f"Context {self.context.id} terminated during message loop") + raise e # process cancelled from outside, kill the loop + except KillerException as e: + error_message = errors.format_error(e) + self.context.log.log(type="error", content=error_message) + raise e # kill the loop + except Exception as e: # Forward other errors to the LLM, maybe it can fix them error_message = errors.format_error(e) msg_response = self.read_prompt("fw.error.md", error=error_message) # error message template - self.append_message(msg_response, human=True) + await self.append_message(msg_response, human=True) PrintStyle(font_color="red", padding=True).print(msg_response) - Log.log(type="error", content=msg_response) - self.future.set_exception(e) #set result to future + self.context.log.log(type="error", content=msg_response) finally: - Agent.streaming_agent = None # unset current streamer + self.context.streaming_agent = None # unset current streamer def read_prompt(self, file:str, **kwargs): content = "" @@ -176,21 +233,21 @@ def get_data(self, field:str): def set_data(self, field:str, value): self.data[field] = value - def append_message(self, msg: str, human: bool = False): + async def append_message(self, msg: str, human: bool = False): message_type = "human" if human else "ai" if self.history and self.history[-1].type == message_type: self.history[-1].content += "\n\n" + msg else: new_message = HumanMessage(content=msg) if human else AIMessage(content=msg) self.history.append(new_message) - self.cleanup_history(self.config.msgs_keep_max, self.config.msgs_keep_start, self.config.msgs_keep_end) + await self.cleanup_history(self.config.msgs_keep_max, self.config.msgs_keep_start, self.config.msgs_keep_end) if message_type=="ai": self.last_message = msg def concat_messages(self,messages): return "\n".join([f"{msg.type}: {msg.content}" for msg in messages]) - def send_adhoc_message(self, system: str, msg: str, output_label:str): + async def send_adhoc_message(self, system: str, msg: str, output_label:str): prompt = ChatPromptTemplate.from_messages([ SystemMessage(content=system), HumanMessage(content=msg)]) @@ -203,13 +260,13 @@ def send_adhoc_message(self, system: str, msg: str, output_label:str): if output_label: PrintStyle(bold=True, font_color="orange", padding=True, background_color="white").print(f"{self.agent_name}: {output_label}:") printer = PrintStyle(italic=True, font_color="orange", padding=False) - logger = Log(type="adhoc", heading=f"{self.agent_name}: {output_label}:") + logger = self.context.log.log(type="adhoc", heading=f"{self.agent_name}: {output_label}:") formatted_inputs = prompt.format() tokens = int(len(formatted_inputs)/4) self.rate_limiter.limit_call_and_input(tokens) - for chunk in chain.stream({}): + async for chunk in chain.astream({}): if self.handle_intervention(): break # wait for intervention and handle it, if paused if isinstance(chunk, str): content = chunk @@ -228,13 +285,13 @@ def get_last_message(self): if self.history: return self.history[-1] - def replace_middle_messages(self,middle_messages): + async def replace_middle_messages(self,middle_messages): cleanup_prompt = self.read_prompt("fw.msg_cleanup.md") - summary = self.send_adhoc_message(system=cleanup_prompt,msg=self.concat_messages(middle_messages), output_label="Mid messages cleanup summary") + summary = await self.send_adhoc_message(system=cleanup_prompt,msg=self.concat_messages(middle_messages), output_label="Mid messages cleanup summary") new_human_message = HumanMessage(content=summary) return [new_human_message] - def cleanup_history(self, max:int, keep_start:int, keep_end:int): + async def cleanup_history(self, max:int, keep_start:int, keep_end:int): if len(self.history) <= max: return self.history @@ -254,48 +311,44 @@ def cleanup_history(self, max:int, keep_start:int, keep_end:int): middle_part = middle_part[:-1] # Replace the middle part using the replacement function - new_middle_part = self.replace_middle_messages(middle_part) + new_middle_part = await self.replace_middle_messages(middle_part) self.history = first_x + new_middle_part + last_y return self.history - def handle_intervention(self, progress:str="") -> bool: - while self.paused: time.sleep(0.1) # wait if paused - if self.intervention_message and not self.intervention_status: # if there is an intervention message, but not yet processed - if progress.strip(): self.append_message(progress) # append the response generated so far - user_msg = self.read_prompt("fw.intervention.md", user_message=self.intervention_message) # format the user intervention template - self.append_message(user_msg,human=True) # append the intervention message + async def handle_intervention(self, progress:str=""): + while self.context.paused: await asyncio.sleep(0.1) # wait if paused + if self.intervention_message: # if there is an intervention message, but not yet processed + msg = self.intervention_message self.intervention_message = "" # reset the intervention message - self.intervention_status = True - return self.intervention_status # return intervention status + if progress.strip(): await self.append_message(progress) # append the response generated so far + user_msg = self.read_prompt("fw.intervention.md", user_message=self.intervention_message) # format the user intervention template + await self.append_message(user_msg,human=True) # append the intervention message + raise InterventionException(msg) - def process_tools(self, msg: str): + async def process_tools(self, msg: str): # search for tool usage requests in agent message tool_request = extract_tools.json_parse_dirty(msg) if tool_request is not None: tool_name = tool_request.get("tool_name", "") tool_args = tool_request.get("tool_args", {}) - - tool = self.get_tool( - tool_name, - tool_args, - msg) + tool = self.get_tool(tool_name, tool_args, msg) - if self.handle_intervention(): return # wait if paused and handle intervention message if needed - tool.before_execution(**tool_args) - if self.handle_intervention(): return # wait if paused and handle intervention message if needed - response = tool.execute(**tool_args) - if self.handle_intervention(): return # wait if paused and handle intervention message if needed - tool.after_execution(response) - if self.handle_intervention(): return # wait if paused and handle intervention message if needed + await self.handle_intervention() # wait if paused and handle intervention message if needed + await tool.before_execution(**tool_args) + await self.handle_intervention() # wait if paused and handle intervention message if needed + response = await tool.execute(**tool_args) + await self.handle_intervention() # wait if paused and handle intervention message if needed + await tool.after_execution(response) + await self.handle_intervention() # wait if paused and handle intervention message if needed if response.break_loop: return response.message else: msg = self.read_prompt("fw.msg_misformat.md") - self.append_message(msg, human=True) + await self.append_message(msg, human=True) PrintStyle(font_color="red", padding=True).print(msg) - Log.log(type="error", content=f"{self.agent_name}: Message misformat:") + self.context.log.log(type="error", content=f"{self.agent_name}: Message misformat:") def get_tool(self, name: str, args: dict, message: str, **kwargs): @@ -314,7 +367,7 @@ def get_tool(self, name: str, args: dict, message: str, **kwargs): return tool_class(agent=self, name=name, args=args, message=message, **kwargs) - def fetch_memories(self,reset_skip=False): + async def fetch_memories(self,reset_skip=False): if self.config.auto_memory_count<=0: return "" if reset_skip: self.memory_skip_counter = 0 @@ -331,14 +384,14 @@ def fetch_memories(self,reset_skip=False): "raw_memories": memories } cleanup_prompt = self.read_prompt("msg.memory_cleanup.md").replace("{", "{{") - clean_memories = self.send_adhoc_message(cleanup_prompt,json.dumps(input), output_label="Memory injection") + clean_memories = await self.send_adhoc_message(cleanup_prompt,json.dumps(input), output_label="Memory injection") return clean_memories - def log_from_stream(self, stream: str, log: Log): + def log_from_stream(self, stream: str, logItem: Log.LogItem): try: if len(stream) < 25: return # no reason to try response = DirtyJson.parse_string(stream) - if isinstance(response, dict): log.update(content=stream, kvps=response) #log if result is a dictionary already + if isinstance(response, dict): logItem.update(content=stream, kvps=response) #log if result is a dictionary already except Exception as e: pass diff --git a/initialize.py b/initialize.py index 2e452d5f1..fc6bac5d2 100644 --- a/initialize.py +++ b/initialize.py @@ -1,5 +1,5 @@ import models -from agent import Agent, AgentConfig +from agent import AgentConfig def initialize(): @@ -52,9 +52,6 @@ def initialize(): # code_exec_ssh_pass = "toor", # additional = {}, ) - - # create the first agent - agent0 = Agent( number = 0, config = config ) - # return initialized agent - return agent0 \ No newline at end of file + # return config object + return config \ No newline at end of file diff --git a/python/helpers/defer.py b/python/helpers/defer.py new file mode 100644 index 000000000..2d26e77a5 --- /dev/null +++ b/python/helpers/defer.py @@ -0,0 +1,61 @@ +import asyncio +import threading +from concurrent.futures import Future + +class DeferredTask: + def __init__(self, func, *args, **kwargs): + self._loop = asyncio.new_event_loop() + # self._thread = None + self._task = None + self._future = Future() + self._start_task(func, *args, **kwargs) + + def _start_task(self, func, *args, **kwargs): + def run_in_thread(loop, func, args, kwargs): + asyncio.set_event_loop(loop) + self._task = loop.create_task(self._run(func, *args, **kwargs)) + loop.run_forever() + + self._thread = threading.Thread(target=run_in_thread, args=(self._loop, func, args, kwargs)) + self._thread.start() + + async def _run(self, func, *args, **kwargs): + try: + result = await func(*args, **kwargs) + self._future.set_result(result) + except Exception as e: + self._future.set_exception(e) + finally: + self._loop.call_soon_threadsafe(self._loop.stop) + + def is_ready(self): + return self._future.done() + + async def result(self, timeout=None): + if self._task is None: + raise RuntimeError("Task was not initialized properly.") + + try: + return await asyncio.wait_for(asyncio.wrap_future(self._future), timeout) + except asyncio.TimeoutError: + raise TimeoutError("The task did not complete within the specified timeout.") + + def result_sync(self, timeout=None): + try: + return self._future.result(timeout) + except TimeoutError: + raise TimeoutError("The task did not complete within the specified timeout.") + + def kill(self): + if self._task and not self._task.done(): + self._loop.call_soon_threadsafe(self._task.cancel) + + def is_alive(self): + return self._thread.is_alive() and not self._future.done() + + def __del__(self): + if self._loop.is_running(): + self._loop.call_soon_threadsafe(self._loop.stop) + if self._thread.is_alive(): + self._thread.join() + self._loop.close() \ No newline at end of file diff --git a/python/helpers/docker.py b/python/helpers/docker.py index 02a961c3d..f157dd432 100644 --- a/python/helpers/docker.py +++ b/python/helpers/docker.py @@ -8,7 +8,8 @@ from python.helpers.log import Log class DockerContainerManager: - def __init__(self, image: str, name: str, ports: Optional[dict[str, int]] = None, volumes: Optional[dict[str, dict[str, str]]] = None): + def __init__(self, logger: Log, image: str, name: str, ports: Optional[dict[str, int]] = None, volumes: Optional[dict[str, dict[str, str]]] = None): + self.logger = logger self.image = image self.name = name self.ports = ports @@ -25,9 +26,9 @@ def init_docker(self): err = format_error(e) if ("ConnectionRefusedError(61," in err or "Error while fetching server API version" in err): PrintStyle.hint("Connection to Docker failed. Is docker or Docker Desktop running?") # hint for user - Log.log(type="hint", content="Connection to Docker failed. Is docker or Docker Desktop running?") + self.logger.log(type="hint", content="Connection to Docker failed. Is docker or Docker Desktop running?") PrintStyle.error(err) - Log.log(type="error", content=err) + self.logger.log(type="error", content=err) time.sleep(5) # try again in 5 seconds else: raise return self.client @@ -38,10 +39,10 @@ def cleanup_container(self) -> None: self.container.stop() self.container.remove() print(f"Stopped and removed the container: {self.container.id}") - Log.log(type="info", content=f"Stopped and removed the container: {self.container.id}") + self.logger.log(type="info", content=f"Stopped and removed the container: {self.container.id}") except Exception as e: print(f"Failed to stop and remove the container: {e}") - Log.log(type="error", content=f"Failed to stop and remove the container: {e}") + self.logger.log(type="error", content=f"Failed to stop and remove the container: {e}") def start_container(self) -> None: @@ -55,7 +56,7 @@ def start_container(self) -> None: if existing_container: if existing_container.status != 'running': print(f"Starting existing container: {self.name} for safe code execution...") - Log.log(type="info", content=f"Starting existing container: {self.name} for safe code execution...") + self.logger.log(type="info", content=f"Starting existing container: {self.name} for safe code execution...") existing_container.start() self.container = existing_container @@ -66,7 +67,7 @@ def start_container(self) -> None: # print(f"Container with name '{self.name}' is already running with ID: {existing_container.id}") else: print(f"Initializing docker container {self.name} for safe code execution...") - Log.log(type="info", content=f"Initializing docker container {self.name} for safe code execution...") + self.logger.log(type="info", content=f"Initializing docker container {self.name} for safe code execution...") self.container = self.client.containers.run( self.image, @@ -77,5 +78,5 @@ def start_container(self) -> None: ) atexit.register(self.cleanup_container) print(f"Started container with ID: {self.container.id}") - Log.log(type="info", content=f"Started container with ID: {self.container.id}") + self.logger.log(type="info", content=f"Started container with ID: {self.container.id}") time.sleep(5) # this helps to get SSH ready diff --git a/python/helpers/errors.py b/python/helpers/errors.py index 0cacec77e..7dd96ea1d 100644 --- a/python/helpers/errors.py +++ b/python/helpers/errors.py @@ -1,6 +1,12 @@ import re import traceback +import asyncio +def handle_error(e: Exception): + # if asyncio.CancelledError, re-raise + if isinstance(e, asyncio.CancelledError): + raise e + def format_error(e: Exception, max_entries=2): traceback_text = traceback.format_exc() # Split the traceback into lines diff --git a/python/helpers/knowledge_import.py b/python/helpers/knowledge_import.py index 2d08ca8ef..ca748d674 100644 --- a/python/helpers/knowledge_import.py +++ b/python/helpers/knowledge_import.py @@ -28,7 +28,7 @@ def calculate_checksum(file_path: str) -> str: hasher.update(buf) return hasher.hexdigest() -def load_knowledge(knowledge_dir: str, index: Dict[str, KnowledgeImport]) -> Dict[str, KnowledgeImport]: +def load_knowledge(logger: Log, knowledge_dir: str, index: Dict[str, KnowledgeImport]) -> Dict[str, KnowledgeImport]: knowledge_dir = files.get_abs_path(knowledge_dir) @@ -49,7 +49,7 @@ def load_knowledge(knowledge_dir: str, index: Dict[str, KnowledgeImport]) -> Dic kn_files = glob.glob(knowledge_dir + '/**/*', recursive=True) if kn_files: print(f"Found {len(kn_files)} knowledge files in {knowledge_dir}, processing...") - Log.log(type="info", content=f"Found {len(kn_files)} knowledge files in {knowledge_dir}, processing...") + logger.log(type="info", content=f"Found {len(kn_files)} knowledge files in {knowledge_dir}, processing...") for file_path in kn_files: ext = file_path.split('.')[-1].lower() @@ -83,5 +83,5 @@ def load_knowledge(knowledge_dir: str, index: Dict[str, KnowledgeImport]) -> Dic index[file_key]['state'] = 'removed' print(f"Processed {cnt_docs} documents from {cnt_files} files.") - Log.log(type="info", content=f"Processed {cnt_docs} documents from {cnt_files} files.") + logger.log(type="info", content=f"Processed {cnt_docs} documents from {cnt_files} files.") return index diff --git a/python/helpers/log.py b/python/helpers/log.py index f3fa316a6..96fdfcbde 100644 --- a/python/helpers/log.py +++ b/python/helpers/log.py @@ -1,58 +1,78 @@ -from dataclasses import dataclass +from dataclasses import dataclass, field +import json from typing import Optional, Dict import uuid + @dataclass class LogItem: + log: 'Log' no: int type: str heading: str content: str kvps: Optional[Dict] = None - + guid: str = "" + + def __post_init__(self): + self.guid = self.log.guid + + def update(self, type: str | None = None, heading: str | None = None, content: str | None = None, kvps: dict | None = None): + if self.guid == self.log.guid: + self.log.update_item(self.no, type=type, heading=heading, content=content, kvps=kvps) + + def output(self): + return { + "no": self.no, + "type": self.type, + "heading": self.heading, + "content": self.content, + "kvps": self.kvps + } class Log: - guid = uuid.uuid4() - version: int = 0 - last_updated: int = 0 - logs: list = [] - - def __init__(self, type: str="placeholder", heading: str="", content: str="", kvps: dict|None = None): - self.item = Log.log(type, heading, content, kvps) # create placeholder log item that will be updated - - def update(self, type: Optional[str] = None, heading: str|None = None, content: str|None = None, kvps: dict|None = None): - Log.edit(self.item.no, type=type, heading=heading, content=content, kvps=kvps) - - @staticmethod - def reset(): - Log.guid = uuid.uuid4() - Log.version = 0 - Log.last_updated = 0 - Log.logs = [] - - @staticmethod - def log(type: str, heading: str|None = None, content: str|None = None, kvps: dict|None = None): - item = LogItem(len(Log.logs), type, heading or "", content or "", kvps) - Log.logs.append(item) - Log.last_updated = item.no - Log.version += 1 + def __init__(self): + self.guid: str = str(uuid.uuid4()) + self.updates: list[int] = [] + self.logs: list[LogItem] = [] + + def log(self, type: str, heading: str | None = None, content: str | None = None, kvps: dict | None = None) -> LogItem: + item = LogItem(log=self,no=len(self.logs), type=type, heading=heading or "", content=content or "", kvps=kvps) + self.logs.append(item) + self.updates += [item.no] return item - - @staticmethod - def edit(no: int, type: Optional[str] = None, heading: str|None = None, content: str|None = None, kvps: dict|None = None): - if 0 <= no < len(Log.logs): - item = Log.logs[no] - if type is not None: - item.type = type - if heading is not None: - item.heading = heading - if content is not None: - item.content = content - if kvps is not None: - item.kvps = kvps - - Log.last_updated = no - Log.version += 1 - else: - raise IndexError("Log item number out of range") + + def update_item(self, no: int, type: str | None = None, heading: str | None = None, content: str | None = None, kvps: dict | None = None): + item = self.logs[no] + if type is not None: + item.type = type + if heading is not None: + item.heading = heading + if content is not None: + item.content = content + if kvps is not None: + item.kvps = kvps + self.updates += [item.no] + + def output(self, start=None, end=None): + if start is None: + start = 0 + if end is None: + end = len(self.updates) + + out = [] + seen = set() + for update in self.updates[start:end]: + if update not in seen: + out.append(self.logs[update].output()) + seen.add(update) + + return out + + + + def reset(self): + self.guid = str(uuid.uuid4()) + self.updates = [] + self.logs = [] diff --git a/python/helpers/rate_limiter.py b/python/helpers/rate_limiter.py index ee5fb6896..f28bdc98d 100644 --- a/python/helpers/rate_limiter.py +++ b/python/helpers/rate_limiter.py @@ -12,7 +12,8 @@ class CallRecord: output_tokens: int = 0 # Default to 0, will be set separately class RateLimiter: - def __init__(self, max_calls: int, max_input_tokens: int, max_output_tokens: int, window_seconds: int = 60): + def __init__(self, logger: Log, max_calls: int, max_input_tokens: int, max_output_tokens: int, window_seconds: int = 60): + self.logger = logger self.max_calls = max_calls self.max_input_tokens = max_input_tokens self.max_output_tokens = max_output_tokens @@ -49,7 +50,7 @@ def _wait_if_needed(self, current_time: float, new_input_tokens: int): wait_time = oldest_record.timestamp + self.window_seconds - current_time if wait_time > 0: PrintStyle(font_color="yellow", padding=True).print(f"Rate limit exceeded. Waiting for {wait_time:.2f} seconds due to: {', '.join(wait_reasons)}") - Log.log("rate_limit","Rate limit exceeded",f"Rate limit exceeded. Waiting for {wait_time:.2f} seconds due to: {', '.join(wait_reasons)}") + self.logger.log("rate_limit","Rate limit exceeded",f"Rate limit exceeded. Waiting for {wait_time:.2f} seconds due to: {', '.join(wait_reasons)}") time.sleep(wait_time) current_time = time.time() diff --git a/python/helpers/shell_local.py b/python/helpers/shell_local.py index 8f389b764..50d1ba57e 100644 --- a/python/helpers/shell_local.py +++ b/python/helpers/shell_local.py @@ -9,7 +9,7 @@ def __init__(self): self.process = None self.full_output = '' - def connect(self): + async def connect(self): # Start a new subprocess with the appropriate shell for the OS if sys.platform.startswith('win'): # Windows @@ -44,7 +44,7 @@ def send_command(self, command: str): self.process.stdin.write(command + '\n') # type: ignore self.process.stdin.flush() # type: ignore - def read_output(self) -> Tuple[str, Optional[str]]: + async def read_output(self) -> Tuple[str, Optional[str]]: if not self.process: raise Exception("Shell not connected") diff --git a/python/helpers/shell_ssh.py b/python/helpers/shell_ssh.py index c6a8886b0..3e598c7d6 100644 --- a/python/helpers/shell_ssh.py +++ b/python/helpers/shell_ssh.py @@ -1,7 +1,8 @@ +import asyncio import paramiko import time import re -from typing import Optional, Tuple +from typing import Tuple from python.helpers.log import Log from python.helpers.strings import calculate_valid_match_lengths @@ -10,7 +11,8 @@ class SSHInteractiveSession: # end_comment = "# @@==>> SSHInteractiveSession End-of-Command <<==@@" # ps1_label = "SSHInteractiveSession CLI>" - def __init__(self, hostname: str, port: int, username: str, password: str): + def __init__(self, logger: Log, hostname: str, port: int, username: str, password: str): + self.logger = logger self.hostname = hostname self.port = port self.username = username @@ -23,7 +25,7 @@ def __init__(self, hostname: str, port: int, username: str, password: str): self.trimmed_command_length = 0 # Initialize trimmed_command_length - def connect(self): + async def connect(self): # try 3 times with wait and then except errors = 0 while True: @@ -33,14 +35,14 @@ def connect(self): # self.shell.send(f'PS1="{SSHInteractiveSession.ps1_label}"'.encode()) # return while True: # wait for end of initial output - full, part = self.read_output() + full, part = await self.read_output() if full and not part: return time.sleep(0.1) except Exception as e: errors += 1 if errors < 3: print(f"SSH Connection attempt {errors}...") - Log.log(type="info", content=f"SSH Connection attempt {errors}...") + self.logger.log(type="info", content=f"SSH Connection attempt {errors}...") time.sleep(5) else: @@ -64,11 +66,12 @@ def send_command(self, command: str): self.trimmed_command_length = 0 self.shell.send(self.last_command) - def read_output(self) -> Tuple[str, str]: + async def read_output(self) -> Tuple[str, str]: if not self.shell: raise Exception("Shell not connected") partial_output = b'' + leftover = b'' while self.shell.recv_ready(): data = self.shell.recv(1024) @@ -76,17 +79,25 @@ def read_output(self) -> Tuple[str, str]: # Trim own command from output if self.last_command and len(self.last_command) > self.trimmed_command_length: command_to_trim = self.last_command[self.trimmed_command_length:] - + data_to_trim = leftover + data + trim_com, trim_out = calculate_valid_match_lengths( - command_to_trim, data, deviation_threshold=8, deviation_reset=2, - ignore_patterns=[rb'\x1b\[\?\d{4}[a-zA-Z](?:> )?', rb'\r', rb'>']) + command_to_trim, data_to_trim, deviation_threshold=8, deviation_reset=2, + ignore_patterns = [ + rb'\x1b\[\?\d{4}[a-zA-Z](?:> )?', # ANSI escape sequences + rb'\r', # Carriage return + rb'>\s', # Greater-than symbol + ], debug=False) + + leftover = b'' if(trim_com > 0 and trim_out > 0): - data = data[trim_out:] + data = data_to_trim[trim_out:] + leftover = data self.trimmed_command_length += trim_com partial_output += data self.full_output += data - time.sleep(0.1) # Prevent busy waiting + await asyncio.sleep(0.1) # Prevent busy waiting # Decode once at the end decoded_partial_output = partial_output.decode('utf-8', errors='replace') diff --git a/python/helpers/strings.py b/python/helpers/strings.py index ae03692a8..1797bc15f 100644 --- a/python/helpers/strings.py +++ b/python/helpers/strings.py @@ -1,61 +1,12 @@ -# def calculate_valid_match_lengths(first: bytes | str, second: bytes | str, deviation_threshold: int = 5, deviation_reset: int = 5) -> tuple[int, int]: -# first_length = len(first) -# second_length = len(second) - -# i, j = 0, 0 -# deviations = 0 -# matched_since_deviation = 0 -# last_matched_i, last_matched_j = 0, 0 # Track the last matched index - -# while i < first_length and j < second_length: -# if first[i] == second[j]: -# last_matched_i, last_matched_j = i + 1, j + 1 # Update last matched position -# i += 1 -# j += 1 -# matched_since_deviation += 1 - -# # Reset the deviation counter if we've matched enough characters since the last deviation -# if matched_since_deviation >= deviation_reset: -# deviations = 0 -# matched_since_deviation = 0 -# else: -# # Determine the look-ahead based on the remaining deviation threshold -# look_ahead = deviation_threshold - deviations - -# # Look ahead to find the best match within the remaining deviation allowance -# best_match = None -# for k in range(1, look_ahead + 1): -# if i + k < first_length and first[i + k] == second[j]: -# best_match = ('i', k) -# break -# if j + k < second_length and first[i] == second[j + k]: -# best_match = ('j', k) -# break - -# if best_match: -# if best_match[0] == 'i': -# i += best_match[1] -# elif best_match[0] == 'j': -# j += best_match[1] -# else: -# i += 1 -# j += 1 - -# deviations += 1 -# matched_since_deviation = 0 - -# if deviations > deviation_threshold: -# break - -# # Return the last matched positions instead of the current indices -# return last_matched_i, last_matched_j - import re +import sys +import time def calculate_valid_match_lengths(first: bytes | str, second: bytes | str, deviation_threshold: int = 5, deviation_reset: int = 5, - ignore_patterns: list[bytes|str] = []) -> tuple[int, int]: + ignore_patterns: list[bytes|str] = [], + debug: bool = False) -> tuple[int, int]: first_length = len(first) second_length = len(second) @@ -99,10 +50,10 @@ def skip_ignored_patterns(s, index): # Look ahead to find the best match within the remaining deviation allowance best_match = None for k in range(1, look_ahead + 1): - if i + k < first_length and first[i + k] == second[j]: + if i + k < first_length and j < second_length and first[i + k] == second[j]: best_match = ('i', k) break - if j + k < second_length and first[i] == second[j + k]: + if j + k < second_length and i < first_length and first[i] == second[j + k]: best_match = ('j', k) break @@ -121,5 +72,22 @@ def skip_ignored_patterns(s, index): if deviations > deviation_threshold: break + if debug: + output = ( + f"First (up to {last_matched_i}): {first[:last_matched_i]!r}\n" + "\n" + f"Second (up to {last_matched_j}): {second[:last_matched_j]!r}\n" + "\n" + f"Current deviation: {deviations}\n" + f"Matched since last deviation: {matched_since_deviation}\n" + + "-" * 40 + "\n" + ) + sys.stdout.write("\r" + output) + sys.stdout.flush() + time.sleep(0.01) # Add a short delay for readability (optional) + + # Return the last matched positions instead of the current indices + return last_matched_i, last_matched_j + # Return the last matched positions instead of the current indices return last_matched_i, last_matched_j \ No newline at end of file diff --git a/python/helpers/tool.py b/python/helpers/tool.py index 0481fbc24..112faa2f6 100644 --- a/python/helpers/tool.py +++ b/python/helpers/tool.py @@ -1,14 +1,13 @@ from abc import abstractmethod -from typing import TypedDict +from dataclasses import dataclass from agent import Agent from python.helpers.print_style import PrintStyle -from python.helpers import files, messages -from python.helpers.log import Log +from python.helpers import messages +@dataclass class Response: - def __init__(self, message: str, break_loop: bool) -> None: - self.message = message - self.break_loop = break_loop + message:str + break_loop:bool class Tool: @@ -19,24 +18,22 @@ def __init__(self, agent: Agent, name: str, args: dict[str,str], message: str, * self.message = message @abstractmethod - def execute(self,**kwargs) -> Response: + async def execute(self,**kwargs) -> Response: pass - def before_execution(self, **kwargs): - if self.agent.handle_intervention(): return # wait for intervention and handle it, if paused + async def before_execution(self, **kwargs): PrintStyle(font_color="#1B4F72", padding=True, background_color="white", bold=True).print(f"{self.agent.agent_name}: Using tool '{self.name}':") - self.log = Log(type="tool", heading=f"{self.agent.agent_name}: Using tool '{self.name}':", content="", kvps=self.args) + self.log = self.agent.context.log.log(type="tool", heading=f"{self.agent.agent_name}: Using tool '{self.name}':", content="", kvps=self.args) if self.args and isinstance(self.args, dict): for key, value in self.args.items(): PrintStyle(font_color="#85C1E9", bold=True).stream(self.nice_key(key)+": ") PrintStyle(font_color="#85C1E9", padding=isinstance(value,str) and "\n" in value).stream(value) PrintStyle().print() - def after_execution(self, response: Response, **kwargs): + async def after_execution(self, response: Response, **kwargs): text = messages.truncate_text(self.agent, response.message.strip(), self.agent.config.max_tool_response_length) msg_response = self.agent.read_prompt("fw.tool_response.md", tool_name=self.name, tool_response=text) - if self.agent.handle_intervention(): return # wait for intervention and handle it, if paused - self.agent.append_message(msg_response, human=True) + await self.agent.append_message(msg_response, human=True) PrintStyle(font_color="#1B4F72", background_color="white", padding=True, bold=True).print(f"{self.agent.agent_name}: Response from tool '{self.name}':") PrintStyle(font_color="#85C1E9").print(response.message) self.log.update(content=response.message) diff --git a/python/helpers/vector_db.py b/python/helpers/vector_db.py index 9fac3093a..01e444102 100644 --- a/python/helpers/vector_db.py +++ b/python/helpers/vector_db.py @@ -14,9 +14,11 @@ class VectorDB: - def __init__(self, embeddings_model, in_memory=False, memory_dir="./memory", knowledge_dir="./knowledge"): + def __init__(self, logger: Log, embeddings_model, in_memory=False, memory_dir="./memory", knowledge_dir="./knowledge"): + self.logger = logger + print("Initializing VectorDB...") - Log.log("info", content="Initializing VectorDB...") + self.logger.log("info", content="Initializing VectorDB...") self.embeddings_model = embeddings_model @@ -76,7 +78,7 @@ def preload_knowledge(self, kn_dir:str, db_dir:str): with open(index_path, 'r') as f: index = json.load(f) - index = knowledge_import.load_knowledge(kn_dir,index) + index = knowledge_import.load_knowledge(self.logger,kn_dir,index) for file in index: if index[file]['state'] in ['changed', 'removed'] and index[file].get('ids',[]): # for knowledge files that have been changed or removed and have IDs diff --git a/python/tools/call_subordinate.py b/python/tools/call_subordinate.py index 71f18e639..d754590a0 100644 --- a/python/tools/call_subordinate.py +++ b/python/tools/call_subordinate.py @@ -1,15 +1,13 @@ from agent import Agent from python.helpers.tool import Tool, Response -from python.helpers import files -from python.helpers.print_style import PrintStyle class Delegation(Tool): - def execute(self, message="", reset="", **kwargs): + async def execute(self, message="", reset="", **kwargs): # create subordinate agent using the data object on this agent and set superior agent to his data object if self.agent.get_data("subordinate") is None or str(reset).lower().strip() == "true": - subordinate = Agent(self.agent.number+1, self.agent.config) + subordinate = Agent(self.agent.number+1, self.agent.config, self.agent.context) subordinate.set_data("superior", self.agent) self.agent.set_data("subordinate", subordinate) # run subordinate agent message loop - return Response( message=self.agent.get_data("subordinate").message_loop(message), break_loop=False) \ No newline at end of file + return Response( message= await self.agent.get_data("subordinate").message_loop(message), break_loop=False) \ No newline at end of file diff --git a/python/tools/code_execution_tool.py b/python/tools/code_execution_tool.py index a6f4d75bb..825fe0443 100644 --- a/python/tools/code_execution_tool.py +++ b/python/tools/code_execution_tool.py @@ -1,17 +1,13 @@ +import asyncio from dataclasses import dataclass -import os, json, contextlib, subprocess, ast, shlex -from io import StringIO +import shlex import time -from typing import Literal -from python.helpers import files, messages -from agent import Agent from python.helpers.tool import Tool, Response from python.helpers import files from python.helpers.print_style import PrintStyle from python.helpers.shell_local import LocalInteractiveSession from python.helpers.shell_ssh import SSHInteractiveSession from python.helpers.docker import DockerContainerManager -from python.helpers.log import Log @dataclass class State: @@ -21,91 +17,91 @@ class State: class CodeExecution(Tool): - def execute(self,**kwargs): + async def execute(self,**kwargs): - if self.agent.handle_intervention(): return Response(message="", break_loop=False) # wait for intervention and handle it, if paused + await self.agent.handle_intervention() # wait for intervention and handle it, if paused - self.prepare_state() + await self.prepare_state() # os.chdir(files.get_abs_path("./work_dir")) #change CWD to work_dir runtime = self.args["runtime"].lower().strip() if runtime == "python": - response = self.execute_python_code(self.args["code"]) + response = await self.execute_python_code(self.args["code"]) elif runtime == "nodejs": - response = self.execute_nodejs_code(self.args["code"]) + response = await self.execute_nodejs_code(self.args["code"]) elif runtime == "terminal": - response = self.execute_terminal_command(self.args["code"]) + response = await self.execute_terminal_command(self.args["code"]) elif runtime == "output": - response = self.get_terminal_output() + response = await self.get_terminal_output() else: response = self.agent.read_prompt("fw.code_runtime_wrong.md", runtime=runtime) if not response: response = self.agent.read_prompt("fw.code_no_output.md") return Response(message=response, break_loop=False) - def before_execution(self, **kwargs): - if self.agent.handle_intervention(): return # wait for intervention and handle it, if paused + async def before_execution(self, **kwargs): + await self.agent.handle_intervention() # wait for intervention and handle it, if paused PrintStyle(font_color="#1B4F72", padding=True, background_color="white", bold=True).print(f"{self.agent.agent_name}: Using tool '{self.name}':") - self.log = Log(type="code_exe", heading=f"{self.agent.agent_name}: Using tool '{self.name}':", content="", kvps=self.args) + self.log = self.agent.context.log.log(type="code_exe", heading=f"{self.agent.agent_name}: Using tool '{self.name}':", content="", kvps=self.args) if self.args and isinstance(self.args, dict): for key, value in self.args.items(): PrintStyle(font_color="#85C1E9", bold=True).stream(self.nice_key(key)+": ") PrintStyle(font_color="#85C1E9", padding=isinstance(value,str) and "\n" in value).stream(value) PrintStyle().print() - def after_execution(self, response, **kwargs): + async def after_execution(self, response, **kwargs): msg_response = self.agent.read_prompt("fw.tool_response.md", tool_name=self.name, tool_response=response.message) - self.agent.append_message(msg_response, human=True) + await self.agent.append_message(msg_response, human=True) - def prepare_state(self): + async def prepare_state(self): self.state = self.agent.get_data("cot_state") if not self.state: #initialize docker container if execution in docker is configured if self.agent.config.code_exec_docker_enabled: - docker = DockerContainerManager(name=self.agent.config.code_exec_docker_name, image=self.agent.config.code_exec_docker_image, ports=self.agent.config.code_exec_docker_ports, volumes=self.agent.config.code_exec_docker_volumes) + docker = DockerContainerManager(logger=self.agent.context.log,name=self.agent.config.code_exec_docker_name, image=self.agent.config.code_exec_docker_image, ports=self.agent.config.code_exec_docker_ports, volumes=self.agent.config.code_exec_docker_volumes) docker.start_container() else: docker = None #initialize local or remote interactive shell insterface if self.agent.config.code_exec_ssh_enabled: - shell = SSHInteractiveSession(self.agent.config.code_exec_ssh_addr,self.agent.config.code_exec_ssh_port,self.agent.config.code_exec_ssh_user,self.agent.config.code_exec_ssh_pass) + shell = SSHInteractiveSession(self.agent.context.log,self.agent.config.code_exec_ssh_addr,self.agent.config.code_exec_ssh_port,self.agent.config.code_exec_ssh_user,self.agent.config.code_exec_ssh_pass) else: shell = LocalInteractiveSession() self.state = State(shell=shell,docker=docker) - shell.connect() + await shell.connect() self.agent.set_data("cot_state", self.state) - def execute_python_code(self, code): + async def execute_python_code(self, code): escaped_code = shlex.quote(code) command = f'python3 -c {escaped_code}' - return self.terminal_session(command) + return await self.terminal_session(command) - def execute_nodejs_code(self, code): + async def execute_nodejs_code(self, code): escaped_code = shlex.quote(code) command = f'node -e {escaped_code}' - return self.terminal_session(command) + return await self.terminal_session(command) - def execute_terminal_command(self, command): - return self.terminal_session(command) + async def execute_terminal_command(self, command): + return await self.terminal_session(command) - def terminal_session(self, command): + async def terminal_session(self, command): - if self.agent.handle_intervention(): return "" # wait for intervention and handle it, if paused + await self.agent.handle_intervention() # wait for intervention and handle it, if paused self.state.shell.send_command(command) PrintStyle(background_color="white",font_color="#1B4F72",bold=True).print(f"{self.agent.agent_name} code execution output:") - return self.get_terminal_output() + return await self.get_terminal_output() - def get_terminal_output(self): + async def get_terminal_output(self): idle=0 while True: - time.sleep(0.1) # Wait for some output to be generated - full_output, partial_output = self.state.shell.read_output() + await asyncio.sleep(0.1) # Wait for some output to be generated + full_output, partial_output = await self.state.shell.read_output() - if self.agent.handle_intervention(): return full_output # wait for intervention and handle it, if paused + await self.agent.handle_intervention() # wait for intervention and handle it, if paused if partial_output: PrintStyle(font_color="#85C1E9").stream(partial_output) diff --git a/python/tools/knowledge_tool.py b/python/tools/knowledge_tool.py index 97ce01dd5..a32245303 100644 --- a/python/tools/knowledge_tool.py +++ b/python/tools/knowledge_tool.py @@ -1,18 +1,14 @@ import os -from agent import Agent from python.helpers import perplexity_search from python.helpers import duckduckgo_search - from . import memory_tool import concurrent.futures - from python.helpers.tool import Tool, Response -from python.helpers import files from python.helpers.print_style import PrintStyle -from python.helpers.log import Log +from python.helpers.errors import handle_error class Knowledge(Tool): - def execute(self, question="", **kwargs): + async def execute(self, question="", **kwargs): with concurrent.futures.ThreadPoolExecutor() as executor: # Schedule the two functions to be run in parallel @@ -21,7 +17,7 @@ def execute(self, question="", **kwargs): perplexity = executor.submit(perplexity_search.perplexity_search, question) else: PrintStyle.hint("No API key provided for Perplexity. Skipping Perplexity search.") - Log(type="hint", content="No API key provided for Perplexity. Skipping Perplexity search.") + self.agent.context.log.log(type="hint", content="No API key provided for Perplexity. Skipping Perplexity search.") perplexity = None @@ -35,16 +31,19 @@ def execute(self, question="", **kwargs): try: perplexity_result = (perplexity.result() if perplexity else "") or "" except Exception as e: + handle_error(e) perplexity_result = "Perplexity search failed: " + str(e) try: duckduckgo_result = duckduckgo.result() except Exception as e: + handle_error(e) duckduckgo_result = "DuckDuckGo search failed: " + str(e) try: memory_result = future_memory.result() except Exception as e: + handle_error(e) memory_result = "Memory search failed: " + str(e) msg = self.agent.read_prompt("tool.knowledge.response.md", diff --git a/python/tools/memory_tool.py b/python/tools/memory_tool.py index e277521c7..f6182dc78 100644 --- a/python/tools/memory_tool.py +++ b/python/tools/memory_tool.py @@ -1,17 +1,16 @@ import re from agent import Agent from python.helpers.vector_db import VectorDB, Document -from python.helpers import files -import os, json +import os from python.helpers.tool import Tool, Response from python.helpers.print_style import PrintStyle -from python.helpers.log import Log +from python.helpers.errors import handle_error # databases based on subdirectories from agent config dbs = {} class Memory(Tool): - def execute(self,**kwargs): + async def execute(self,**kwargs): result="" try: @@ -26,9 +25,10 @@ def execute(self,**kwargs): elif "delete" in kwargs: result = delete(self.agent, kwargs["delete"]) except Exception as e: + handle_error(e) # hint about embedding change with existing database PrintStyle.hint("If you changed your embedding model, you will need to remove contents of /memory directory.") - Log(type="hint", content="If you changed your embedding model, you will need to remove contents of /memory directory.") + self.agent.context.log.log(type="hint", content="If you changed your embedding model, you will need to remove contents of /memory directory.") raise # result = process_query(self.agent, self.args["memory"],self.args["action"], result_count=self.agent.config.auto_memory_count) @@ -63,7 +63,7 @@ def get_db(agent: Agent): key = (mem_dir, kn_dir) if key not in dbs: - db = VectorDB(embeddings_model=agent.config.embeddings_model, in_memory=False, memory_dir=mem_dir, knowledge_dir=kn_dir) + db = VectorDB(agent.context.log,embeddings_model=agent.config.embeddings_model, in_memory=False, memory_dir=mem_dir, knowledge_dir=kn_dir) dbs[key] = db else: db = dbs[key] diff --git a/python/tools/response.py b/python/tools/response.py index f81fdcb2b..52ece8b5f 100644 --- a/python/tools/response.py +++ b/python/tools/response.py @@ -1,22 +1,14 @@ -from agent import Agent -from python.helpers import files -from python.helpers.print_style import PrintStyle - -from agent import Agent from python.helpers.tool import Tool, Response -from python.helpers import files -from python.helpers.print_style import PrintStyle -from python.helpers.log import Log class ResponseTool(Tool): - def execute(self,**kwargs): + async def execute(self,**kwargs): self.agent.set_data("timeout", self.agent.config.response_timeout_seconds) return Response(message=self.args["text"], break_loop=True) - def before_execution(self, **kwargs): - self.log = Log(type="response", heading=f"{self.agent.agent_name}: Responding:", content=self.args.get("text", "")) + async def before_execution(self, **kwargs): + self.log = self.agent.context.log.log(type="response", heading=f"{self.agent.agent_name}: Responding:", content=self.args.get("text", "")) - def after_execution(self, response, **kwargs): + async def after_execution(self, response, **kwargs): pass # do not add anything to the history or output \ No newline at end of file diff --git a/python/tools/task_done.py b/python/tools/task_done.py index 689e43a99..0acf00e70 100644 --- a/python/tools/task_done.py +++ b/python/tools/task_done.py @@ -1,21 +1,13 @@ -from agent import Agent -from python.helpers import files -from python.helpers.print_style import PrintStyle - -from agent import Agent from python.helpers.tool import Tool, Response -from python.helpers import files -from python.helpers.print_style import PrintStyle -from python.helpers.log import Log class TaskDone(Tool): - def execute(self,**kwargs): + async def execute(self,**kwargs): self.agent.set_data("timeout", 0) return Response(message=self.args["text"], break_loop=True) - def before_execution(self, **kwargs): - self.log = Log(type="response", heading=f"{self.agent.agent_name}: Task done:", content=self.args.get("text", "")) + async def before_execution(self, **kwargs): + self.log = self.agent.context.log.log(type="response", heading=f"{self.agent.agent_name}: Task done:", content=self.args.get("text", "")) - def after_execution(self, response, **kwargs): + async def after_execution(self, response, **kwargs): pass # do add anything to the history or output \ No newline at end of file diff --git a/python/tools/unknown.py b/python/tools/unknown.py index d0dc3f504..3f52c9af1 100644 --- a/python/tools/unknown.py +++ b/python/tools/unknown.py @@ -1,8 +1,7 @@ from python.helpers.tool import Tool, Response -from python.helpers import files class Unknown(Tool): - def execute(self, **kwargs): + async def execute(self, **kwargs): return Response( message=self.agent.read_prompt("fw.tool_not_found.md", tool_name=self.name, diff --git a/python/tools/webpage_content_tool.py b/python/tools/webpage_content_tool.py index b4f156cb7..25c138388 100644 --- a/python/tools/webpage_content_tool.py +++ b/python/tools/webpage_content_tool.py @@ -3,9 +3,11 @@ from urllib.parse import urlparse from newspaper import Article from python.helpers.tool import Tool, Response +from python.helpers.errors import handle_error + class WebpageContentTool(Tool): - def execute(self, url="", **kwargs): + async def execute(self, url="", **kwargs): if not url: return Response(message="Error: No URL provided.", break_loop=False) @@ -36,4 +38,5 @@ def execute(self, url="", **kwargs): except requests.RequestException as e: return Response(message=f"Error fetching webpage: {str(e)}", break_loop=False) except Exception as e: + handle_error(e) return Response(message=f"An error occurred: {str(e)}", break_loop=False) \ No newline at end of file diff --git a/run_cli.py b/run_cli.py index 597019846..e2f4406a0 100644 --- a/run_cli.py +++ b/run_cli.py @@ -1,7 +1,8 @@ +import asyncio import threading, time, models, os from ansio import application_keypad, mouse_input, raw_input from ansio.input import InputEvent, get_input_event -from agent import Agent, AgentConfig +from agent import AgentContext from python.helpers.print_style import PrintStyle from python.helpers.files import read_file from python.helpers import files @@ -9,17 +10,18 @@ from initialize import initialize +context: AgentContext = None # type: ignore input_lock = threading.Lock() -os.chdir(files.get_abs_path("./work_dir")) #change CWD to work_dir + # Main conversation loop -def chat(agent:Agent): +async def chat(context: AgentContext): # start the conversation loop while True: # ask user for message with input_lock: - timeout = agent.get_data("timeout") # how long the agent is willing to wait + timeout = context.agent0.get_data("timeout") # how long the agent is willing to wait if not timeout: # if agent wants to wait for user input forever PrintStyle(background_color="#6C3483", font_color="white", bold=True, padding=True).print(f"User message ('e' to leave):") import readline # this fixes arrow keys in terminal @@ -33,7 +35,7 @@ def chat(agent:Agent): user_input = timeout_input("> ", timeout=timeout) if not user_input: - user_input = agent.read_prompt("fw.msg_timeout.md") + user_input = context.agent0.read_prompt("fw.msg_timeout.md") PrintStyle(font_color="white", padding=False).stream(f"{user_input}") else: user_input = user_input.strip() @@ -47,17 +49,17 @@ def chat(agent:Agent): if user_input.lower() == 'e': break # send message to agent0, - assistant_response = agent.message_loop(user_input) + assistant_response = await context.communicate(user_input).result() # print agent0 response - PrintStyle(font_color="white",background_color="#1D8348", bold=True, padding=True).print(f"{agent.agent_name}: reponse:") + PrintStyle(font_color="white",background_color="#1D8348", bold=True, padding=True).print(f"{context.agent0.agent_name}: reponse:") PrintStyle(font_color="white").print(f"{assistant_response}") # User intervention during agent streaming def intervention(): - if Agent.streaming_agent and not Agent.paused: - Agent.paused = True # stop agent streaming + if context.streaming_agent and not context.paused: + context.paused = True # stop agent streaming PrintStyle(background_color="#6C3483", font_color="white", bold=True, padding=True).print(f"User intervention ('e' to leave, empty to continue):") import readline # this fixes arrow keys in terminal @@ -65,8 +67,8 @@ def intervention(): PrintStyle(font_color="white", padding=False, log_only=True).print(f"> {user_input}") if user_input.lower() == 'e': os._exit(0) # exit the conversation when the user types 'exit' - if user_input: Agent.streaming_agent.intervention_message = user_input # set intervention message if non-empty - Agent.paused = False # continue agent streaming + if user_input: context.streaming_agent.intervention_message = user_input # set intervention message if non-empty + context.paused = False # continue agent streaming # Capture keyboard input to trigger user intervention @@ -78,7 +80,7 @@ def capture_keys(): intervent = False time.sleep(0.1) - if Agent.streaming_agent: + if context.streaming_agent: # with raw_input, application_keypad, mouse_input: with input_lock, raw_input, application_keypad: event: InputEvent | None = get_input_event(timeout=0.1) @@ -97,5 +99,6 @@ def timeout_input(prompt, timeout=10): threading.Thread(target=capture_keys, daemon=True).start() # initialize and start the chat - agent0 = initialize() - chat(agent0) \ No newline at end of file + config = initialize() + context = AgentContext(config) + asyncio.run(chat(context)) \ No newline at end of file diff --git a/run_ui.py b/run_ui.py index 10e33153a..01637d5e6 100644 --- a/run_ui.py +++ b/run_ui.py @@ -1,18 +1,18 @@ +import asyncio from functools import wraps import os from pathlib import Path import threading +import uuid from flask import Flask, request, jsonify, Response from flask_basicauth import BasicAuth -from agent import Agent +from agent import AgentContext from initialize import initialize from python.helpers.files import get_abs_path from python.helpers.print_style import PrintStyle from python.helpers.log import Log from dotenv import load_dotenv -#global agent instance -agent0: Agent|None = None #initialize the internal Flask server app = Flask("app",static_folder=get_abs_path("./webui"),static_url_path="/") @@ -22,12 +22,10 @@ app.config['BASIC_AUTH_PASSWORD'] = os.environ.get('BASIC_AUTH_PASSWORD') or "admin" #default pass basic_auth = BasicAuth(app) -# get global agent -def get_agent(reset: bool = False) -> Agent: - global agent0 - if agent0 is None or reset: - agent0 = initialize() - return agent0 +# get context to run agent zero in +def get_context(ctxid:str): + if not ctxid: return AgentContext.first() or AgentContext(config=initialize()) + return AgentContext.get(ctxid) or AgentContext(config=initialize(),id=ctxid) # Now you can use @requires_auth function decorator to require login on certain pages def requires_auth(f): @@ -60,64 +58,46 @@ async def health_check(): # send message to agent (async UI) @app.route('/msg', methods=['POST']) -async def handle_message(): - try: - - #agent instance - agent = get_agent() - - #data sent to the server - input = request.get_json() - text = input.get("text", "") - - # print to console and log - PrintStyle(background_color="#6C3483", font_color="white", bold=True, padding=True).print(f"User message:") - PrintStyle(font_color="white", padding=False).print(f"> {text}") - Log.log(type="user", heading="User message", content=text) - - #pass the message to the agent - threading.Thread(target=agent.communicate, args=(text,)).start() - - #data from this server - response = { - "ok": True, - "message": "Message received.", - } - - except Exception as e: - response = { - "ok": False, - "message": str(e), - } - - #respond with json - return jsonify(response) +async def handle_message_async(): + return await handle_message(False) # send message to agent (synchronous API) @app.route('/msg_sync', methods=['POST']) async def handle_msg_sync(): - try: - - #agent instance - agent = get_agent() + return await handle_message(True) +async def handle_message(sync:bool): + try: + #data sent to the server input = request.get_json() text = input.get("text", "") + ctxid = input.get("context", "") + blev = input.get("broadcast", 1) + + #context instance - get or create + context = get_context(ctxid) # print to console and log PrintStyle(background_color="#6C3483", font_color="white", bold=True, padding=True).print(f"User message:") PrintStyle(font_color="white", padding=False).print(f"> {text}") - Log.log(type="user", heading="User message", content=text) - - #pass the message to the agent - response = agent.communicate(text) - - #data from this server - response = { - "ok": True, - "message": response, - } + context.log.log(type="user", heading="User message", content=text) + + if sync: + context.communicate(text) + result = await context.process.result() #type: ignore + response = { + "ok": True, + "message": result, + } + else: + + print("\n\n",(context.process and context.process.is_alive())) + context.communicate(text) + response = { + "ok": True, + "message": "Message received.", + } except Exception as e: response = { @@ -127,7 +107,7 @@ async def handle_msg_sync(): #respond with json return jsonify(response) - + # pausing/unpausing the agent @app.route('/pause', methods=['POST']) async def pause(): @@ -136,8 +116,12 @@ async def pause(): #data sent to the server input = request.get_json() paused = input.get("paused", False) + ctxid = input.get("context", "") + + #context instance - get or create + context = get_context(ctxid) - Agent.paused = paused + context.paused = paused response = { "ok": True, @@ -158,10 +142,15 @@ async def pause(): @app.route('/reset', methods=['POST']) async def reset(): try: - - agent = get_agent(reset=True) - Log.reset() + #data sent to the server + input = request.get_json() + ctxid = input.get("context", "") + + #context instance - get or create + context = get_context(ctxid) + context.reset() + response = { "ok": True, "message": "Agent restarted.", @@ -176,6 +165,32 @@ async def reset(): #respond with json return jsonify(response) +# killing context +@app.route('/remove', methods=['POST']) +async def remove(): + try: + + #data sent to the server + input = request.get_json() + ctxid = input.get("context", "") + + #context instance - get or create + AgentContext.remove(ctxid) + + response = { + "ok": True, + "message": "Context removed.", + } + + except Exception as e: + response = { + "ok": False, + "message": str(e), + } + + #respond with json + return jsonify(response) + # Web UI polling @app.route('/poll', methods=['POST']) async def poll(): @@ -183,19 +198,36 @@ async def poll(): #data sent to the server input = request.get_json() - from_no = input.get("log_from", "") - - logs = Log.logs[int(from_no):] - to = Log.last_updated #max(0, len(Log.logs)-1) + ctxid = input.get("context", uuid.uuid4()) + from_no = input.get("log_from", 0) + + #context instance - get or create + context = get_context(ctxid) + + + logs = context.log.output(start=from_no) + + # loop AgentContext._contexts + ctxs = [] + for ctx in AgentContext._contexts.values(): + ctxs.append({ + "id": ctx.id, + "no": ctx.no, + "log_guid": ctx.log.guid, + "log_version": len(ctx.log.updates), + "log_length": len(ctx.log.logs), + "paused": ctx.paused + }) #data from this server response = { "ok": True, + "context": context.id, + "contexts": ctxs, "logs": logs, - "log_to": to, - "log_guid": Log.guid, - "log_version": Log.version, - "paused": Agent.paused + "log_guid": context.log.guid, + "log_version": len(context.log.updates), + "paused": context.paused } except Exception as e: @@ -214,8 +246,6 @@ async def poll(): load_dotenv() - get_agent() #initialize - # Suppress only request logs but keep the startup messages from werkzeug.serving import WSGIRequestHandler class NoRequestLoggingWSGIRequestHandler(WSGIRequestHandler): diff --git a/test.py b/test.py index d1fab12ba..d556cbf4d 100644 --- a/test.py +++ b/test.py @@ -1,12 +1,22 @@ -from python.helpers.dirty_json import DirtyJson +from python.helpers.strings import calculate_valid_match_lengths +# first = b'python3 -c \'from selenium import webdriver\nfrom selenium.webdriver.chrome.service import Service\nfrom webdriver_manager.chrome import ChromeDriverManager\nimport time\n\n# Set up the Chromium WebDriver\noptions = webdriver.ChromeOptions()\noptions.add_argument(\'"\'"\'--headless\'"\'"\') # Run in headless mode\noptions.add_argument(\'"\'"\'--no-sandbox\'"\'"\')\noptions.add_argument(\'"\'"\'--disable-dev-shm-usage\'"\'"\')\n\n# Specify the correct version of ChromeDriver\nservice = Service(\'"\'"\'/root/.wdm/drivers/chromedriver/linux64/128.0.6613.113/chromedriver\'"\'"\')\ndriver = webdriver.Chrome(service=service, options=options)\n\n# Navigate to the LinkedIn profile\nurl = \'"\'"\'https://www.linkedin.com/in/jan-tomasek/\'"\'"\'\ndriver.get(url)\n\n# Wait for the page to load\ntime.sleep(5)\n\n# Save the page source to a file\nwith open(\'"\'"\'jan_tomasek_linkedin.html\'"\'"\', \'"\'"\'w\'"\'"\', encoding=\'"\'"\'utf-8\'"\'"\') as file:\n file.write(driver.page_source)\n\n# Close the WebDriver\ndriver.quit()\'\n' +first = b'https://www.linkedin.com/in/jan-tomasek/\'"\'"\'\ndriver.get(url)\n\n# Wait for the page to load\ntime.sleep(5)\n\n# Save the page source to a file\nwith open(\'"\'"\'jan_tomasek_linkedin.html\'"\'"\', \'"\'"\'w\'"\'"\', encoding=\'"\'"\'utf-8\'"\'"\') as file:\n file.write(driver.page_source)\n\n# Close the WebDriver\ndriver.quit()\'\n' -json_string = """ -{"key1": "value1", - "key2": "value2", - "key3": "value3" -} -""" +# second = b'python3 -c \'from selenium import webdriver\r\n\x1b[?2004l\r\x1b[?2004h> from selenium.webdriver.chrome.service import Service\r\n\x1b[?2004l\r\x1b[?2004h> from webdriver_manager.chrome import ChromeDriverManager\r\n\x1b[?2004l\r\x1b[?2004h> import time\r\n\x1b[?2004l\r\x1b[?2004h> \r\n\x1b[?2004l\r\x1b[?2004h> # Set up the Chromium WebDriver\r\n\x1b[?2004l\r\x1b[?2004h> options = webdriver.ChromeOptions()\r\n\x1b[?2004l\r\x1b[?2004h> options.add_argument(\'"\'"\'--headless\'"\'"\') # Run in headless mode\r\n\x1b[?2004l\r\x1b[?2004h> options.add_argument(\'"\'"\'--no-sandbox\'"\'"\')\r\n\x1b[?2004l\r\x1b[?2004h> options.add_argument(\'"\'"\'--disable-dev-shm-usage\'"\'"\')\r\n\x1b[?2004l\r\x1b[?2004h> \r\n\x1b[?2004l\r\x1b[?2004h> # Specify the correct version of ChromeDriver\r\n\x1b[?2004l\r\x1b[?2004h> service = Service(\'"\'"\'/root/.wdm/drivers/chromedriver/linux64/128.0.6613.113/chromedriver\'"\'"\')\r\n\x1b[?2004l\r\x1b[?2004h> driver = webdriver.Chrome(service=service, options=options)\r\n\x1b[?2004l\r\x1b[?2004h> \r\n\x1b[?2004l\r\x1b[?2004h> # Navigate to the LinkedIn profile\r\n\x1b[?2004l\r\x1b[?2004h> url = \'"\'"\'https://www.linkedin.com/in/jan-tomasek/\'"\'"\'\r\n\x1b[?' +second = b'https://www.linkedin.com/in/jan-tomasek/\'"\'"\'\r\n\x1b[?' -json = DirtyJson.parse_string(json_string) -print(json) \ No newline at end of file +trim_com, trim_out = calculate_valid_match_lengths( + first, second, deviation_threshold=8, deviation_reset=2, + ignore_patterns = [ + rb'\x1b\[\?\d{4}[a-zA-Z](?:> )?', # ANSI escape sequences + rb'\r', # Carriage return + rb'>\s', # Greater-than symbol + ], + debug=True) + +if(trim_com > 0 and trim_out > 0): + sec_tr = second[:trim_out] +else: sec_tr = "original" + +print(sec_tr) \ No newline at end of file diff --git a/webui/index.css b/webui/index.css index 869f63b83..c8838c543 100644 --- a/webui/index.css +++ b/webui/index.css @@ -209,6 +209,16 @@ h4 { transform: scale(0.95); } +.chat-list-button { + cursor: pointer; + color: inherit; /* Keep the text color the same as the surrounding text */ + text-decoration: none; /* Remove underline by default */ +} + +.chat-list-button:hover { + text-decoration: underline; /* Add underline on hover */ +} + #send-button { background-color: #bb86fc; } @@ -323,4 +333,8 @@ color: #6ec583; .disconnected{ color: #d87979; +} + +.font-bold{ + font-weight: bold; } \ No newline at end of file diff --git a/webui/index.html b/webui/index.html index 44407be99..5b9c68b45 100644 --- a/webui/index.html +++ b/webui/index.html @@ -8,7 +8,7 @@ @@ -21,7 +21,7 @@