From c279b487b0c1d019e4f64c707d8d96a2383cbea9 Mon Sep 17 00:00:00 2001 From: neb6dav Date: Fri, 25 Oct 2024 20:33:58 -0400 Subject: [PATCH 1/6] Update initialize.py Added Hugging Face model --- initialize.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/initialize.py b/initialize.py index b741eae90..1300622b2 100644 --- a/initialize.py +++ b/initialize.py @@ -5,7 +5,7 @@ def initialize(): # main chat model used by agents (smarter, more accurate) - chat_llm = models.get_openai_chat(model_name="gpt-4o-mini", temperature=0) + # chat_llm = models.get_openai_chat(model_name="gpt-4o-mini", temperature=0) # chat_llm = models.get_ollama_chat(model_name="llama3.2:3b-instruct-fp16", temperature=0) # chat_llm = models.get_lmstudio_chat(model_name="lmstudio-community/Meta-Llama-3.1-8B-Instruct-GGUF", temperature=0) # chat_llm = models.get_openrouter_chat(model_name="openai/o1-mini-2024-09-12") @@ -15,6 +15,7 @@ def initialize(): # chat_llm = models.get_mistral_chat(model_name="mistral-small-latest", temperature=0) # chat_llm = models.get_groq_chat(model_name="llama-3.2-90b-text-preview", temperature=0) # chat_llm = models.get_sambanova_chat(model_name="Meta-Llama-3.1-70B-Instruct-8k", temperature=0) + chat_llm = models.get_huggingface_chat(model_name="Qwen/Qwen2.5-72B-Instruct", temperature=0.01) # utility model used for helper functions (cheaper, faster) utility_llm = chat_llm From cc04370c8f1fc236f3b357fb2daec23de9ce74af Mon Sep 17 00:00:00 2001 From: neb6dav Date: Fri, 25 Oct 2024 20:34:40 -0400 Subject: [PATCH 2/6] Update example.env Added Hugging Face API key --- example.env | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/example.env b/example.env index d0bc2093b..d5cfa5db6 100644 --- a/example.env +++ b/example.env @@ -6,6 +6,7 @@ API_KEY_GOOGLE= API_KEY_MISTRAL= API_KEY_OPENROUTER= API_KEY_SAMBANOVA= +API_KEY_HUGGINGFACE= API_KEY_OPENAI_AZURE= OPENAI_AZURE_ENDPOINT= @@ -23,4 +24,4 @@ PYDEVD_DISABLE_FILE_VALIDATION=1 OLLAMA_BASE_URL="http://127.0.0.1:11434" LM_STUDIO_BASE_URL="http://127.0.0.1:1234/v1" OPEN_ROUTER_BASE_URL="https://openrouter.ai/api/v1" -SAMBANOVA_BASE_URL="https://fast-api.snova.ai/v1" \ No newline at end of file +SAMBANOVA_BASE_URL="https://fast-api.snova.ai/v1" From d4df42f880b6b39aa591c94455bc82b1cb6921a5 Mon Sep 17 00:00:00 2001 From: neb6dav Date: Fri, 25 Oct 2024 20:36:16 -0400 Subject: [PATCH 3/6] Update models.py Added import HuggingFaceEndPoint from langchain_huggingface also added Hugging Face models. Temperature has to be positive. --- models.py | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/models.py b/models.py index fa7c8d032..739974f45 100644 --- a/models.py +++ b/models.py @@ -5,7 +5,7 @@ from langchain_community.embeddings import OllamaEmbeddings from langchain_anthropic import ChatAnthropic from langchain_groq import ChatGroq -from langchain_huggingface import HuggingFaceEmbeddings +from langchain_huggingface import HuggingFaceEmbeddings, HuggingFaceEndpoint from langchain_google_genai import GoogleGenerativeAI, HarmBlockThreshold, HarmCategory from langchain_mistralai import ChatMistralAI from pydantic.v1.types import SecretStr @@ -35,6 +35,9 @@ def get_ollama_embedding(model_name:str, temperature=DEFAULT_TEMPERATURE, base_u def get_huggingface_embedding(model_name:str): return HuggingFaceEmbeddings(model_name=model_name) +def get_huggingface_chat(model_name:str, api_key=get_api_key("huggingface"), temperature=0.01, base_url=os.getenv("HUGGINGFACE_BASE_URL") or "https://api-inference.huggingface.co/models/{model_name}"): + return HuggingFaceEndpoint(endpoint_url=model_name, temperature=temperature, huggingfacehub_api_token=api_key) # type: ignore + # LM Studio and other OpenAI compatible interfaces def get_lmstudio_chat(model_name:str, temperature=DEFAULT_TEMPERATURE, base_url=os.getenv("LM_STUDIO_BASE_URL") or "http://127.0.0.1:1234/v1"): return ChatOpenAI(model_name=model_name, base_url=base_url, temperature=temperature, api_key="none") # type: ignore From e116166a39d33851326d5a10701f47f427226ac2 Mon Sep 17 00:00:00 2001 From: neb6dav Date: Sat, 26 Oct 2024 23:20:28 -0400 Subject: [PATCH 4/6] Update agent.py Updated to accommodate allowing a0 to generate subordinate roles and assign them with unique chat_model and utility_model, but not Embedding models. --- agent.py | 684 ++++++++++++++++++++++++++----------------------------- 1 file changed, 329 insertions(+), 355 deletions(-) diff --git a/agent.py b/agent.py index 8ee08a743..c4ba846f0 100644 --- a/agent.py +++ b/agent.py @@ -1,30 +1,99 @@ import asyncio from dataclasses import dataclass, field import time, importlib, inspect, os, json -from typing import Any, Optional, Dict, TypedDict +from typing import Any, Optional, Dict, TypedDict, Callable import uuid -from python.helpers import extract_tools, rate_limiter, files, errors -from python.helpers.print_style import PrintStyle + from langchain.schema import AIMessage from langchain_core.prompts import ChatPromptTemplate, MessagesPlaceholder from langchain_core.messages import HumanMessage, SystemMessage from langchain_core.language_models.chat_models import BaseChatModel from langchain_core.language_models.llms import BaseLLM from langchain_core.embeddings import Embeddings + +from python.helpers import extract_tools, rate_limiter, files, errors +from python.helpers.print_style import PrintStyle import python.helpers.log as Log from python.helpers.dirty_json import DirtyJson from python.helpers.defer import DeferredTask -from typing import Callable +# Base Exception Classes +class InterventionException(Exception): + """Raised when intervention is needed - skips rest of message loop iteration""" + pass + +class RepairableException(Exception): + """Not forwarded to LLM, cannot be fixed on its own, ends message loop""" + pass -class AgentContext: +class HandledException(Exception): + """Indicates an exception has already been handled""" + pass + +# Configuration Classes +@dataclass +class AgentAPIConfig: + """Configuration class for agent-specific API settings""" + chat_model: Optional[BaseChatModel | BaseLLM] = None + utility_model: Optional[BaseChatModel | BaseLLM] = None + + def merge_with_default(self, default_config: 'AgentConfig') -> 'AgentAPIConfig': + """Merges this config with default config, preferring this config's non-None values""" + return AgentAPIConfig( + chat_model=self.chat_model or default_config.chat_model, + utility_model=self.utility_model or default_config.utility_model + ) + +@dataclass +class AgentConfig: + chat_model: BaseChatModel | BaseLLM + utility_model: BaseChatModel | BaseLLM + embeddings_model: Embeddings + prompts_subdir: str = "" + memory_subdir: str = "" + knowledge_subdirs: list[str] = field(default_factory=lambda: ["default", "custom"]) + auto_memory_count: int = 3 + auto_memory_skip: int = 2 + rate_limit_seconds: int = 60 + rate_limit_requests: int = 15 + rate_limit_input_tokens: int = 0 + rate_limit_output_tokens: int = 0 + msgs_keep_max: int = 25 + msgs_keep_start: int = 5 + msgs_keep_end: int = 10 + response_timeout_seconds: int = 60 + max_tool_response_length: int = 3000 + code_exec_docker_enabled: bool = True + code_exec_docker_name: str = "agent-zero-exe" + code_exec_docker_image: str = "frdel/agent-zero-exe:latest" + code_exec_docker_ports: dict[str, int] = field( + default_factory=lambda: {"22/tcp": 50022} + ) + code_exec_docker_volumes: dict[str, dict[str, str]] = field( + default_factory=lambda: { + files.get_abs_path("work_dir"): {"bind": "/root", "mode": "rw"}, + files.get_abs_path("instruments"): {"bind": "/instruments", "mode": "rw"}, + } + ) + code_exec_ssh_enabled: bool = True + code_exec_ssh_addr: str = "localhost" + code_exec_ssh_port: int = 50022 + code_exec_ssh_user: str = "root" + code_exec_ssh_pass: str = "toor" + additional: Dict[str, Any] = field(default_factory=dict) + subordinate_configs: Dict[str, AgentAPIConfig] = field(default_factory=dict) + + def get_subordinate_config(self, role: str) -> AgentAPIConfig: + """Get API configuration for a specific subordinate role""" + return self.subordinate_configs.get(role, AgentAPIConfig()).merge_with_default(self) +class AgentContext: _contexts: dict[str, "AgentContext"] = {} _counter: int = 0 def __init__( self, - config: "AgentConfig", + config: AgentConfig, id: str | None = None, name: str | None = None, agent0: "Agent|None" = None, @@ -32,7 +101,6 @@ def __init__( paused: bool = False, streaming_agent: "Agent|None" = None, ): - # build context self.id = id or str(uuid.uuid4()) self.name = name self.config = config @@ -43,27 +111,26 @@ def __init__( self.process: DeferredTask | None = None AgentContext._counter += 1 self.no = AgentContext._counter - - self._contexts[self.id] = self + AgentContext._contexts[self.id] = self @staticmethod - def get(id: str): + def get(id: str) -> Optional["AgentContext"]: return AgentContext._contexts.get(id, None) @staticmethod - def first(): + def first() -> Optional["AgentContext"]: if not AgentContext._contexts: return None return list(AgentContext._contexts.values())[0] @staticmethod - def remove(id: str): + def remove(id: str) -> Optional["AgentContext"]: context = AgentContext._contexts.pop(id, None) if context and context.process: context.process.kill() return context - def reset(self): + def reset(self) -> None: if self.process: self.process.kill() self.log.reset() @@ -71,30 +138,24 @@ def reset(self): self.streaming_agent = None self.paused = False - def communicate(self, msg: str, broadcast_level: int = 1): + def communicate(self, msg: str, broadcast_level: int = 1) -> DeferredTask: self.paused = False # unpause if paused - if self.streaming_agent: - current_agent = self.streaming_agent - else: - current_agent = self.agent0 + current_agent = self.streaming_agent if self.streaming_agent else self.agent0 if self.process and self.process.is_alive(): - # set intervention messages to agent(s): + # Set intervention messages to agent(s) intervention_agent = current_agent while intervention_agent and broadcast_level != 0: intervention_agent.intervention_message = msg broadcast_level -= 1 - intervention_agent = intervention_agent.data.get("superior", None) + intervention_agent = intervention_agent.get_data("superior") else: - - # self.process = DeferredTask(current_agent.monologue, msg) self.process = DeferredTask(self._process_chain, current_agent, msg) return self.process - # this wrapper ensures that superior agents are called back if the chat was loaded from file and original callstack is gone - async def _process_chain(self, agent: 'Agent', msg: str, user=True): + async def _process_chain(self, agent: 'Agent', msg: str, user: bool = True) -> str: try: msg_template = ( agent.read_prompt("fw.user_message.md", message=msg) @@ -106,125 +167,75 @@ async def _process_chain(self, agent: 'Agent', msg: str, user=True): ) ) response = await agent.monologue(msg_template) - superior = agent.data.get("superior", None) + superior = agent.get_data("superior") if superior: response = await self._process_chain(superior, response, False) return response except Exception as e: agent.handle_critical_exception(e) + raise -@dataclass -class AgentConfig: - chat_model: BaseChatModel | BaseLLM - utility_model: BaseChatModel | BaseLLM - embeddings_model: Embeddings - prompts_subdir: str = "" - memory_subdir: str = "" - knowledge_subdirs: list[str] = field(default_factory=lambda: ["default", "custom"]) - auto_memory_count: int = 3 - auto_memory_skip: int = 2 - rate_limit_seconds: int = 60 - rate_limit_requests: int = 15 - rate_limit_input_tokens: int = 0 - rate_limit_output_tokens: int = 0 - msgs_keep_max: int = 25 - msgs_keep_start: int = 5 - msgs_keep_end: int = 10 - response_timeout_seconds: int = 60 - max_tool_response_length: int = 3000 - code_exec_docker_enabled: bool = True - code_exec_docker_name: str = "agent-zero-exe" - code_exec_docker_image: str = "frdel/agent-zero-exe:latest" - code_exec_docker_ports: dict[str, int] = field( - default_factory=lambda: {"22/tcp": 50022} - ) - code_exec_docker_volumes: dict[str, dict[str, str]] = field( - default_factory=lambda: { - files.get_abs_path("work_dir"): {"bind": "/root", "mode": "rw"}, - files.get_abs_path("instruments"): {"bind": "/instruments", "mode": "rw"}, - } - ) - code_exec_ssh_enabled: bool = True - code_exec_ssh_addr: str = "localhost" - code_exec_ssh_port: int = 50022 - code_exec_ssh_user: str = "root" - code_exec_ssh_pass: str = "toor" - additional: Dict[str, Any] = field(default_factory=dict) - +class LoopData: + def __init__(self): + self.iteration: int = -1 + self.system: list[str] = [] + self.message: str = "" + self.history_from: int = 0 + self.history: list = [] class Message: def __init__(self): - self.segments: list[str] - self.human: bool - + self.segments: list[str] = [] + self.human: bool = False + self.timestamp: float = time.time() class Monologue: def __init__(self): - self.done = False + self.done: bool = False self.summary: str = "" self.messages: list[Message] = [] + self.start_time: float = time.time() def finish(self): - pass - + self.done = True + self.end_time: float = time.time() class History: def __init__(self): self.monologues: list[Monologue] = [] self.start_monologue() - def current_monologue(self): + def current_monologue(self) -> Monologue: return self.monologues[-1] - def start_monologue(self): + def start_monologue(self) -> Monologue: if self.monologues: self.current_monologue().finish() self.monologues.append(Monologue()) return self.current_monologue() - -class LoopData: - def __init__(self): - self.iteration = -1 - self.system = [] - self.message = "" - self.history_from = 0 - self.history = [] - - -# intervention exception class - skips rest of message loop iteration -class InterventionException(Exception): - pass - - -# killer exception class - not forwarded to LLM, cannot be fixed on its own, ends message loop -class RepairableException(Exception): - pass - - -class HandledException(Exception): - pass - - class Agent: - def __init__( - self, number: int, config: AgentConfig, context: AgentContext | None = None + self, + number: int, + config: AgentConfig, + context: Optional['AgentContext'] = None, + role: str = "", ): - - # agent config - self.config = config - - # agent context - self.context = context or AgentContext(config) - - # non-config vars + # Core initialization + self.config = AgentConfig(**vars(config)) + self.role = role self.number = number self.agent_name = f"Agent {self.number}" - - self.history = [] - self.last_message = "" - self.intervention_message = "" + self.context = context or AgentContext(config) + + # State management + self.history: list[Any] = [] + self.last_message: str = "" + self.intervention_message: str = "" + self.data: Dict[str, Any] = {} + + # Rate limiting self.rate_limiter = rate_limiter.RateLimiter( self.context.log, max_calls=self.config.rate_limit_requests, @@ -232,189 +243,172 @@ def __init__( max_output_tokens=self.config.rate_limit_output_tokens, window_seconds=self.config.rate_limit_seconds, ) - self.data = {} # free data object all the tools can use - async def monologue(self, msg: str): + # Apply role-specific configuration + if role and role in self.config.subordinate_configs: + role_config = self.config.get_subordinate_config(role) + if role_config.chat_model: + self.config.chat_model = role_config.chat_model + if role_config.utility_model: + self.config.utility_model = role_config.utility_model + + async def monologue(self, msg: str) -> str: while True: try: - # loop data dictionary to pass to extensions loop_data = LoopData() loop_data.message = msg loop_data.history_from = len(self.history) - # call monologue_start extensions + # Start monologue extensions await self.call_extensions("monologue_start", loop_data=loop_data) printer = PrintStyle(italic=True, font_color="#b3ffd9", padding=False) - user_message = loop_data.message - await self.append_message(user_message, human=True) + await self.append_message(msg, human=True) - # let the agent run message loop until he stops it with a response tool while True: - - self.context.streaming_agent = self # mark self as current streamer + self.context.streaming_agent = self agent_response = "" loop_data.iteration += 1 try: - - # set system prompt and message history + # Prepare system prompt and history loop_data.system = [] loop_data.history = self.history - - # and allow extensions to edit them - await self.call_extensions( - "message_loop_prompts", loop_data=loop_data - ) - - # build chain from system prompt, message history and model - prompt = ChatPromptTemplate.from_messages( - [ - SystemMessage(content="\n\n".join(loop_data.system)), - MessagesPlaceholder(variable_name="messages"), - ] - ) - chain = prompt | self.config.chat_model - - # rate limiter TODO - move to extension, make per-model - formatted_inputs = prompt.format(messages=self.history) - tokens = int(len(formatted_inputs) / 4) - self.rate_limiter.limit_call_and_input(tokens) - - # output that the agent is starting - PrintStyle( - bold=True, - font_color="green", - padding=True, - background_color="white", - ).print(f"{self.agent_name}: Generating") - log = self.context.log.log( - type="agent", heading=f"{self.agent_name}: Generating" - ) - - async for chunk in chain.astream( - {"messages": loop_data.history} - ): - await self.handle_intervention( - agent_response - ) # wait for intervention and handle it, if paused - - if isinstance(chunk, str): - content = chunk - elif hasattr(chunk, "content"): - content = str(chunk.content) - else: - content = str(chunk) - - if content: - printer.stream( - content - ) # output the agent response stream - agent_response += ( - content # concatenate stream into the response - ) - self.log_from_stream(agent_response, log) - - self.rate_limiter.set_output_tokens( - int(len(agent_response) / 4) - ) # rough estimation - - await self.handle_intervention(agent_response) - - if ( - self.last_message == agent_response - ): # if assistant_response is the same as last message in history, let him know - await self.append_message( - agent_response - ) # Append the assistant's response to the history - warning_msg = self.read_prompt("fw.msg_repeat.md") - await self.append_message( - warning_msg, human=True - ) # Append warning message to the history - PrintStyle(font_color="orange", padding=True).print( - warning_msg - ) - self.context.log.log(type="warning", content=warning_msg) - - else: # otherwise proceed with tool - await self.append_message( - agent_response - ) # Append the assistant's response to the history - tools_result = await self.process_tools( - agent_response - ) # process tools requested in agent message - if tools_result: # final response of message loop available - return tools_result # break the execution if the task is done - - # exceptions inside message loop: - except InterventionException as e: - pass # intervention message has been handled in handle_intervention(), proceed with conversation loop - except ( - RepairableException - ) as e: # Forward repairable errors to the LLM, maybe it can fix them - error_message = errors.format_error(e) - msg_response = self.read_prompt( - "fw.error.md", error=error_message - ) # error message template - await self.append_message(msg_response, human=True) - PrintStyle(font_color="red", padding=True).print(msg_response) - self.context.log.log(type="error", content=msg_response) - except Exception as e: # Other exception kill the loop + await self.call_extensions("message_loop_prompts", loop_data=loop_data) + + # Build and execute chain + response = await self._execute_chain(loop_data, printer) + + # Process response + if response: + return response + + except InterventionException: + continue # Continue with conversation loop + except RepairableException as e: + await self._handle_repairable_error(e) + except Exception as e: self.handle_critical_exception(e) finally: - # call message_loop_end extensions - await self.call_extensions( - "message_loop_end", loop_data=loop_data - ) - - # exceptions outside message loop: - except InterventionException as e: - pass # just start over + await self.call_extensions("message_loop_end", loop_data=loop_data) + + except InterventionException: + continue # Start over except Exception as e: self.handle_critical_exception(e) finally: - self.context.streaming_agent = None # unset current streamer - # call monologue_end extensions - await self.call_extensions("monologue_end", loop_data=loop_data) # type: ignore + self.context.streaming_agent = None + await self.call_extensions("monologue_end", loop_data=loop_data) + + async def _execute_chain(self, loop_data: LoopData, printer: PrintStyle) -> Optional[str]: + # Build chain + prompt = ChatPromptTemplate.from_messages([ + SystemMessage(content="\n\n".join(loop_data.system)), + MessagesPlaceholder(variable_name="messages"), + ]) + chain = prompt | self.config.chat_model + + # Rate limiting + formatted_inputs = prompt.format(messages=self.history) + tokens = int(len(formatted_inputs) / 4) + self.rate_limiter.limit_call_and_input(tokens) + + # Execute chain + PrintStyle(bold=True, font_color="green", padding=True, background_color="white").print( + f"{self.agent_name}: Generating" + ) + log = self.context.log.log(type="agent", heading=f"{self.agent_name}: Generating") + + agent_response = "" + async for chunk in chain.astream({"messages": loop_data.history}): + await self.handle_intervention(agent_response) + + content = self._extract_chunk_content(chunk) + if content: + printer.stream(content) + agent_response += content + self.log_from_stream(agent_response, log) + + self.rate_limiter.set_output_tokens(int(len(agent_response) / 4)) + await self.handle_intervention(agent_response) + + # Handle response + if self.last_message == agent_response: + await self._handle_repeated_message(agent_response) + return None - def handle_critical_exception(self, exception: Exception): + await self.append_message(agent_response) + return await self.process_tools(agent_response) + + def handle_critical_exception(self, exception: Exception) -> None: if isinstance(exception, HandledException): - raise exception # Re-raise the exception to kill the loop + raise exception elif isinstance(exception, asyncio.CancelledError): - # Handling for asyncio.CancelledError PrintStyle(font_color="white", background_color="red", padding=True).print( f"Context {self.context.id} terminated during message loop" ) - raise HandledException( - exception - ) # Re-raise the exception to cancel the loop + raise HandledException(exception) else: - # Handling for general exceptions error_message = errors.format_error(exception) PrintStyle(font_color="red", padding=True).print(error_message) self.context.log.log(type="error", content=error_message) - raise HandledException(exception) # Re-raise the exception to kill the loop + raise HandledException(exception) + + async def handle_intervention(self, progress: str = "") -> None: + while self.context.paused: + await asyncio.sleep(0.1) + if self.intervention_message: + msg = self.intervention_message + self.intervention_message = "" + if progress.strip(): + await self.append_message(progress) + user_msg = self.read_prompt("fw.intervention.md", user_message=msg) + await self.append_message(user_msg, human=True) + raise InterventionException(msg) + + async def create_subordinate(self, role: str = "") -> 'Agent': + subordinate = Agent( + number=self.number + 1, + config=self.config, + context=self.context, + role=role + ) + subordinate.set_data("superior", self) + self.set_data("subordinate", subordinate) + return subordinate + + # Helper methods + def get_data(self, field: str) -> Any: + return self.data.get(field, None) + + def set_data(self, field: str, value: Any) -> None: + self.data[field] = value def read_prompt(self, file: str, **kwargs) -> str: prompt_dir = files.get_abs_path("prompts/default") backup_dir = [] - if ( - self.config.prompts_subdir - ): # if agent has custom folder, use it and use default as backup + if self.config.prompts_subdir: prompt_dir = files.get_abs_path("prompts", self.config.prompts_subdir) backup_dir.append(files.get_abs_path("prompts/default")) return files.read_file( - files.get_abs_path(prompt_dir, file), backup_dirs=backup_dir, **kwargs + files.get_abs_path(prompt_dir, file), + backup_dirs=backup_dir, + **kwargs ) - def get_data(self, field: str): - return self.data.get(field, None) + def log_from_stream(self, stream: str, log_item: Log.LogItem) -> None: + try: + if len(stream) >= 25: + response = DirtyJson.parse_string(stream) + if isinstance(response, dict): + log_item.update(content=stream, kvps=response) + except Exception: + pass - def set_data(self, field: str, value): - self.data[field] = value + # Continuing the Agent class... - async def append_message(self, msg: str, human: bool = False): + async def append_message(self, msg: str, human: bool = False) -> None: message_type = "human" if human else "ai" if self.history and self.history[-1].type == message_type: self.history[-1].content += "\n\n" + msg @@ -429,58 +423,41 @@ async def append_message(self, msg: str, human: bool = False): if message_type == "ai": self.last_message = msg - def concat_messages(self, messages): - return "\n".join([f"{msg.type}: {msg.content}" for msg in messages]) - - async def call_utility_llm( - self, system: str, msg: str, callback: Callable[[str], None] | None = None - ): - prompt = ChatPromptTemplate.from_messages( - [SystemMessage(content=system), HumanMessage(content=msg)] - ) - - chain = prompt | self.config.utility_model - response = "" - - formatted_inputs = prompt.format() - tokens = int(len(formatted_inputs) / 4) - self.rate_limiter.limit_call_and_input(tokens) - - async for chunk in chain.astream({}): - await self.handle_intervention() # wait for intervention and handle it, if paused - - if isinstance(chunk, str): - content = chunk - elif hasattr(chunk, "content"): - content = str(chunk.content) - else: - content = str(chunk) - - if callback: - callback(content) + async def cleanup_history(self, max_msgs: int, keep_start: int, keep_end: int) -> list: + if len(self.history) <= max_msgs: + return self.history - response += content + first_x = self.history[:keep_start] + last_y = self.history[-keep_end:] + middle_part = self.history[keep_start:-keep_end] - self.rate_limiter.set_output_tokens(int(len(response) / 4)) + # Ensure first message in middle is "human" + if middle_part and middle_part[0].type != "human": + if first_x: + middle_part.insert(0, first_x.pop()) - return response + # Ensure odd number of messages in middle + if len(middle_part) % 2 == 0: + middle_part = middle_part[:-1] - def get_last_message(self): - if self.history: - return self.history[-1] + new_middle_part = await self.replace_middle_messages(middle_part) + self.history = first_x + new_middle_part + last_y + return self.history - async def replace_middle_messages(self, middle_messages): + async def replace_middle_messages(self, middle_messages: list) -> list: cleanup_prompt = self.read_prompt("fw.msg_cleanup.md") - log_item = self.context.log.log( - type="util", heading="Mid messages cleanup summary" - ) + log_item = self.context.log.log(type="util", heading="Mid messages cleanup summary") PrintStyle( - bold=True, font_color="orange", padding=True, background_color="white" + bold=True, + font_color="orange", + padding=True, + background_color="white" ).print(f"{self.agent_name}: Mid messages cleanup summary") + printer = PrintStyle(italic=True, font_color="orange", padding=False) - def log_callback(content): + def log_callback(content: str) -> None: printer.print(content) log_item.stream(content=content) @@ -489,57 +466,39 @@ def log_callback(content): msg=self.concat_messages(middle_messages), callback=log_callback, ) - new_human_message = HumanMessage(content=summary) - return [new_human_message] - - async def cleanup_history(self, max: int, keep_start: int, keep_end: int): - if len(self.history) <= max: - return self.history - - first_x = self.history[:keep_start] - last_y = self.history[-keep_end:] - - # Identify the middle part - middle_part = self.history[keep_start:-keep_end] + return [HumanMessage(content=summary)] - # Ensure the first message in the middle is "human", if not, move one message back - if middle_part and middle_part[0].type != "human": - if len(first_x) > 0: - middle_part.insert(0, first_x.pop()) - - # Ensure the middle part has an odd number of messages - if len(middle_part) % 2 == 0: - middle_part = middle_part[:-1] + async def call_utility_llm( + self, + system: str, + msg: str, + callback: Optional[Callable[[str], None]] = None + ) -> str: + prompt = ChatPromptTemplate.from_messages([ + SystemMessage(content=system), + HumanMessage(content=msg) + ]) - # Replace the middle part using the replacement function - new_middle_part = await self.replace_middle_messages(middle_part) + chain = prompt | self.config.utility_model + response = "" - self.history = first_x + new_middle_part + last_y + formatted_inputs = prompt.format() + tokens = int(len(formatted_inputs) / 4) + self.rate_limiter.limit_call_and_input(tokens) - return self.history + async for chunk in chain.astream({}): + await self.handle_intervention() + content = self._extract_chunk_content(chunk) + + if callback and content: + callback(content) + + response += content - async def handle_intervention(self, progress: str = ""): - while self.context.paused: - await asyncio.sleep(0.1) # wait if paused - if ( - self.intervention_message - ): # if there is an intervention message, but not yet processed - msg = self.intervention_message - self.intervention_message = "" # reset the intervention message - if progress.strip(): - await self.append_message( - progress - ) # append the response generated so far - user_msg = self.read_prompt( - "fw.intervention.md", user_message=msg - ) # format the user intervention template - await self.append_message( - user_msg, human=True - ) # append the intervention message - raise InterventionException(msg) + self.rate_limiter.set_output_tokens(int(len(response) / 4)) + return response - async def process_tools(self, msg: str): - # search for tool usage requests in agent message + async def process_tools(self, msg: str) -> Optional[str]: tool_request = extract_tools.json_parse_dirty(msg) if tool_request is not None: @@ -547,34 +506,25 @@ async def process_tools(self, msg: str): tool_args = tool_request.get("tool_args", {}) tool = self.get_tool(tool_name, tool_args, msg) - await self.handle_intervention() # wait if paused and handle intervention message if needed + await self.handle_intervention() await tool.before_execution(**tool_args) - await self.handle_intervention() # wait if paused and handle intervention message if needed + + await self.handle_intervention() response = await tool.execute(**tool_args) - await self.handle_intervention() # wait if paused and handle intervention message if needed + + await self.handle_intervention() await tool.after_execution(response) - await self.handle_intervention() # wait if paused and handle intervention message if needed + + await self.handle_intervention() if response.break_loop: return response.message else: - msg = self.read_prompt("fw.msg_misformat.md") - await self.append_message(msg, human=True) - PrintStyle(font_color="red", padding=True).print(msg) - self.context.log.log( - type="error", content=f"{self.agent_name}: Message misformat" - ) - - def log_from_stream(self, stream: str, logItem: Log.LogItem): - try: - if len(stream) < 25: - return # no reason to try - response = DirtyJson.parse_string(stream) - if isinstance(response, dict): - logItem.update( - content=stream, kvps=response - ) # log if result is a dictionary already - except Exception as e: - pass + error_msg = self.read_prompt("fw.msg_misformat.md") + await self.append_message(error_msg, human=True) + PrintStyle(font_color="red", padding=True).print(error_msg) + self.context.log.log(type="error", content=f"{self.agent_name}: Message misformat") + + return None def get_tool(self, name: str, args: dict, message: str, **kwargs): from python.tools.unknown import Unknown @@ -594,3 +544,27 @@ async def call_extensions(self, folder: str, **kwargs) -> Any: ) for cls in classes: await cls(agent=self).execute(**kwargs) + + def concat_messages(self, messages: list) -> str: + return "\n".join([f"{msg.type}: {msg.content}" for msg in messages]) + + def _extract_chunk_content(self, chunk: Any) -> str: + if isinstance(chunk, str): + return chunk + elif hasattr(chunk, "content"): + return str(chunk.content) + return str(chunk) + + async def _handle_repairable_error(self, error: RepairableException) -> None: + error_message = errors.format_error(error) + msg_response = self.read_prompt("fw.error.md", error=error_message) + await self.append_message(msg_response, human=True) + PrintStyle(font_color="red", padding=True).print(msg_response) + self.context.log.log(type="error", content=msg_response) + + async def _handle_repeated_message(self, agent_response: str) -> None: + await self.append_message(agent_response) + warning_msg = self.read_prompt("fw.msg_repeat.md") + await self.append_message(warning_msg, human=True) + PrintStyle(font_color="orange", padding=True).print(warning_msg) + self.context.log.log(type="warning", content=warning_msg) From 9107127e1bf7f0ccb44b65f65b089a4a1b3ec28a Mon Sep 17 00:00:00 2001 From: neb6dav Date: Sat, 26 Oct 2024 23:21:19 -0400 Subject: [PATCH 5/6] Update initialize.py Updated to allow for a0 to create subordinates with "defined" roles, which include unique chat_model and utility_model. --- initialize.py | 60 +++++++++++++++++++++++++++++++++++---------------- 1 file changed, 41 insertions(+), 19 deletions(-) diff --git a/initialize.py b/initialize.py index 1300622b2..748548211 100644 --- a/initialize.py +++ b/initialize.py @@ -1,11 +1,11 @@ import models -from agent import AgentConfig +from agent import AgentConfig, AgentAPIConfig from python.helpers import files def initialize(): # main chat model used by agents (smarter, more accurate) - # chat_llm = models.get_openai_chat(model_name="gpt-4o-mini", temperature=0) + chat_llm = models.get_openai_chat(model_name="gpt-4-turbo-preview", temperature=0) # chat_llm = models.get_ollama_chat(model_name="llama3.2:3b-instruct-fp16", temperature=0) # chat_llm = models.get_lmstudio_chat(model_name="lmstudio-community/Meta-Llama-3.1-8B-Instruct-GGUF", temperature=0) # chat_llm = models.get_openrouter_chat(model_name="openai/o1-mini-2024-09-12") @@ -15,50 +15,72 @@ def initialize(): # chat_llm = models.get_mistral_chat(model_name="mistral-small-latest", temperature=0) # chat_llm = models.get_groq_chat(model_name="llama-3.2-90b-text-preview", temperature=0) # chat_llm = models.get_sambanova_chat(model_name="Meta-Llama-3.1-70B-Instruct-8k", temperature=0) - chat_llm = models.get_huggingface_chat(model_name="Qwen/Qwen2.5-72B-Instruct", temperature=0.01) # utility model used for helper functions (cheaper, faster) - utility_llm = chat_llm + utility_llm = models.get_openai_chat(model_name="gpt-3.5-turbo", temperature=0) - # embedding model used for memory - embedding_llm = models.get_openai_embedding(model_name="text-embedding-3-small") + # embedding model used for memory - shared across all agents for consistency + embedding_llm = models.get_openai_embedding(model_name="text-embedding-3-large") # embedding_llm = models.get_ollama_embedding(model_name="nomic-embed-text") # embedding_llm = models.get_huggingface_embedding(model_name="sentence-transformers/all-MiniLM-L6-v2") # embedding_llm = models.get_lmstudio_embedding(model_name="nomic-ai/nomic-embed-text-v1.5-GGUF") + # Define specialized configurations for different subordinate roles + coder_config = AgentAPIConfig( + chat_model=models.get_anthropic_chat(model_name="claude-3-opus-20240229", temperature=0), + utility_model=models.get_openai_chat(model_name="gpt-3.5-turbo", temperature=0) + ) + + researcher_config = AgentAPIConfig( + chat_model=models.get_openai_chat(model_name="gpt-4-turbo-preview", temperature=0), + utility_model=models.get_openai_chat(model_name="gpt-3.5-turbo", temperature=0) + ) + + writer_config = AgentAPIConfig( + chat_model=models.get_anthropic_chat(model_name="claude-3-sonnet-20240229", temperature=0.7), + utility_model=models.get_openai_chat(model_name="gpt-3.5-turbo", temperature=0) + ) + # agent configuration config = AgentConfig( - chat_model = chat_llm, - utility_model = utility_llm, - embeddings_model = embedding_llm, - # prompts_subdir = "default", + chat_model=chat_llm, + utility_model=utility_llm, + embeddings_model=embedding_llm, + # prompts_subdir = "dianoia-xl", # memory_subdir = "", - knowledge_subdirs = ["default","custom"], - auto_memory_count = 0, + knowledge_subdirs=["default", "custom"], + auto_memory_count=0, # auto_memory_skip = 2, # rate_limit_seconds = 60, - rate_limit_requests = 30, + rate_limit_requests=30, # rate_limit_input_tokens = 0, # rate_limit_output_tokens = 0, # msgs_keep_max = 25, # msgs_keep_start = 5, # msgs_keep_end = 10, - max_tool_response_length = 3000, + max_tool_response_length=3000, # response_timeout_seconds = 60, - code_exec_docker_enabled = True, + code_exec_docker_enabled=True, # code_exec_docker_name = "agent-zero-exe", # code_exec_docker_image = "frdel/agent-zero-exe:latest", # code_exec_docker_ports = { "22/tcp": 50022 } # code_exec_docker_volumes = { - # files.get_abs_path("work_dir"): {"bind": "/root", "mode": "rw"}, - # files.get_abs_path("instruments"): {"bind": "/instruments", "mode": "rw"}, - # }, - code_exec_ssh_enabled = True, + # files.get_abs_path("work_dir"): {"bind": "/root", "mode": "rw"}, + # files.get_abs_path("instruments"): {"bind": "/instruments", "mode": "rw"}, + # }, + code_exec_ssh_enabled=True, # code_exec_ssh_addr = "localhost", # code_exec_ssh_port = 50022, # code_exec_ssh_user = "root", # code_exec_ssh_pass = "toor", # additional = {}, + + # Role-specific API configurations for subordinate agents + subordinate_configs={ + "coder": coder_config, + "researcher": researcher_config, + "writer": writer_config + } ) # return config object From 99f0020c86535c16fc842083f3bcf64089c94e28 Mon Sep 17 00:00:00 2001 From: neb6dav Date: Sat, 26 Oct 2024 23:22:15 -0400 Subject: [PATCH 6/6] Update call_subordinate.py Updated tool so that tool is able to utilize unique chat_model and utility_model. --- python/tools/call_subordinate.py | 16 +++++++++++----- 1 file changed, 11 insertions(+), 5 deletions(-) diff --git a/python/tools/call_subordinate.py b/python/tools/call_subordinate.py index 227e87a03..f74f0de4f 100644 --- a/python/tools/call_subordinate.py +++ b/python/tools/call_subordinate.py @@ -2,13 +2,19 @@ from python.helpers.tool import Tool, Response class Delegation(Tool): - - async def execute(self, message="", reset="", **kwargs): + async def execute(self, message="", reset="", role="", **kwargs): # create subordinate agent using the data object on this agent and set superior agent to his data object if self.agent.get_data("subordinate") is None or str(reset).lower().strip() == "true": - subordinate = Agent(self.agent.number+1, self.agent.config, self.agent.context) + # Create new subordinate with specified role + subordinate = Agent( + number=self.agent.number + 1, + config=self.agent.config, + context=self.agent.context, + role=role # Pass through the role argument + ) subordinate.set_data("superior", self.agent) - self.agent.set_data("subordinate", subordinate) + self.agent.set_data("subordinate", subordinate) + # run subordinate agent message loop subordinate: Agent = self.agent.get_data("subordinate") - return Response( message= await subordinate.monologue(message), break_loop=False) \ No newline at end of file + return Response(message=await subordinate.monologue(message), break_loop=False)