diff --git a/docs/source/quickstart.nblink b/docs/source/quickstart.nblink deleted file mode 100644 index 6533056e..00000000 --- a/docs/source/quickstart.nblink +++ /dev/null @@ -1,3 +0,0 @@ -{ - "path": "../../examples/Quickstart.ipynb" -} \ No newline at end of file diff --git a/docs/source/quickstart.rst b/docs/source/quickstart.rst new file mode 100644 index 00000000..6d0a3b9a --- /dev/null +++ b/docs/source/quickstart.rst @@ -0,0 +1,87 @@ +Quickstart +========== + +This is a brief introduction to motleycrew. + +For a working example of agents, tools, crew, and SimpleTask, check out the :doc:`blog with images `. + +For a working example of custom tasks that fully utilize the knowledge graph backend, check out the :doc:`research agent `. + +Agents and tools +---------------- + +Motleycrew provides thin wrappers for all the common agent frameworks: Langchain, LlamaIndex, CrewAI, and Autogen (please let us know if you want any others added!). +It also provides thin wrappers for Langchain and LlamaIndex tools, allowing you to use any of these tools with any of these agents. + +MotleyCrew also supports **delegation**: you can simply give any agent as a tool to any other agent. + +All the wrappers for tools and agents implement the Runnable interface, so you can use them as-is in LCEL and Langgraph code. + +Output handlers (aka return_direct) +^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ + +An **output handler** is a tool that the agent uses for submitting the final output instead of returning it raw. Besides defining a schema for the output, the output handler enables you to implement any validation logic inside it, including agent-based. If your agent returns invalid output, you can raise an exception that will be returned to the agent so that it can retry. + +Essentially, an output handler is a tool that returns its output directly to the user, thus finishing the agent execution. This behavior is enabled by setting the ``return_direct=True`` argument for the tool. Unlike other frameworks, MotleyCrew allows to have multiple output handlers for one agent, from which the agent can choose one. + +MotleyCrew also supports **forced output handlers**. This means that the agent will only be able to return output via an output handler, and not directly. This is useful if you want to ensure that the agent only returns output in a specific format. + +See our usage examples with a :doc:`simple validator ` and an :doc:`advanced output handler with multiple fields `. + +MotleyTool +^^^^^^^^^^ + +A tool in motleycrew, like in other frameworks, is basically a function that takes an input and returns an output. +It is called a tool in the sense that it is usually used by an agent to perform a specific action. +Besides the function itself, a tool also contains an input schema which describes the input format to the LLM. + +``MotleyTool`` is the base class for all tools in motleycrew. It is a subclass of ``Runnable`` that adds some additional features to the tool, along with necessary adapters and converters. + +If you pass a tool from a supported framework (currently Langchain, LlamaIndex, and CrewAI) to a motleycrew agent, it will be automatically converted. If you want to have control over this, e.g. to customize tool params, you can do it manually. + +.. code-block:: python + + motley_tool = MotleyTool.from_supported_tool(my_tool) + +It is also possible to define a custom tool using the ``MotleyTool`` base class, overriding the ``run`` method. This is especially useful if you want to access context such as the caller agent or its last input, which can be useful for validation. + +.. code-block:: python + + class MyTool(MotleyTool): + def run(self, some_input: str) -> str: + return f"Received {some_input} from agent {self.agent} with last input {self.agent_input}" + +Tools can be executed asynchronously, either directly of via by an asynchronous agent. By default, the async version will just run the sync version in a separate thread + +MotleyTool can reflect exceptions that are raised inside it back to the agent, which can then retry the tool call. You can pass a list of exception classes to the ``exceptions_to_reflect`` argument in the constructor (or even pass the ``Exception`` class to reflect everything). + +Crew and tasks +-------------- + +The other two key concepts in motleycrew are crew and tasks. The crew is the orchestrator for tasks, and must be passed to all tasks at creation; tasks can be connected into a DAG using the ``>>`` operator, for example ``TaskA >> TaskB``. This means that ``TaskB`` will not be started before ``TaskA`` is complete, and will be given ``TaskA``'s output. + +Once all tasks and their relationships have been set up, it all can be run via ``crew.run()``, which returns a list of the executed ``TaskUnits`` (see below for details). + +SimpleTask +^^^^^^^^^^ + +``SimpleTask`` is a basic implementation of the ``Task`` API. It only requires a crew, a description, and an agent. When it's executed, the description is combined with the output of any upstream tasks and passed on to the agent, and the agent's output is the tasks's output. + +For a working illustration of all the concepts so far, see the :doc:`blog with images ` example. + +Knowledge graph backend and custom tasks +---------------------------------------- + +The functionality so far is convenient, allowing us to mix all the popular agents and tools, but otherwise fairly vanilla, little different from, for example, the CrewAI semantics. Fortunately, the above introduction just scratched the surface of the motleycrew ``Task`` API. + +In motleycrew, a task is basically a set of rules describing how to perform actions. It provides a **worker** (e.g. an agent) and sets of input data called **task units**. This allows defining workflows of any complexity concisely using crew semantics. For a deeper dive, check out the page on :doc:`key concepts `. + +The crew queries and dispatches available task units in a loop, managing task states using an embedded :doc:`knowledge graph `. + +This dispatch method easily supports different execution backends, from synchronous to asyncio, threaded, etc. + +Example: Recursive question-answering in the research agent +^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ + +Motleycrew architecture described above easily allows to generate task units on the fly, if needed. An example of the power of this approach is the :doc:`research agent ` that dynamically generates new questions based on retrieved context for previous questions. +This example also shows how workers can collaborate via the shared knowledge graph, storing all necessary data in a way that is natural to the task. diff --git a/motleycrew/applications/customer_support/ray_serve_app.py b/motleycrew/applications/customer_support/ray_serve_app.py index be11fc5d..2ee10806 100644 --- a/motleycrew/applications/customer_support/ray_serve_app.py +++ b/motleycrew/applications/customer_support/ray_serve_app.py @@ -58,6 +58,10 @@ async def root(self): @app.websocket("/ws") async def websocket_endpoint(self, websocket: WebSocket): await websocket.accept() + await websocket.send_json( + {"type": "agent_message", "content": "Hello! How can I help you?"} + ) + communication_interface = WebSocketCommunicationInterface(websocket) context = SupportAgentContext(self.graph_store, communication_interface) @@ -75,11 +79,11 @@ async def websocket_endpoint(self, websocket: WebSocket): try: while True: data = await websocket.receive_text() - resolution, escalate = await agent.ainvoke({"prompt": data}) - if escalate: + resolution = await agent.ainvoke({"prompt": data}) + if resolution.additional_kwargs.get("escalate"): await communication_interface.escalate_to_human_agent() else: - await communication_interface.resolve_issue(resolution) + await communication_interface.resolve_issue(resolution.content) except Exception as e: logger.error(f"WebSocket error: {e}") diff --git a/motleycrew/applications/customer_support/support_agent.py b/motleycrew/applications/customer_support/support_agent.py index 04f20509..c047c378 100644 --- a/motleycrew/applications/customer_support/support_agent.py +++ b/motleycrew/applications/customer_support/support_agent.py @@ -1,9 +1,9 @@ import asyncio from dataclasses import dataclass, field -from dotenv import load_dotenv from pathlib import Path -from typing import List, Optional, Sequence, Tuple +from typing import List, Optional, Sequence +from dotenv import load_dotenv from langchain.agents import AgentExecutor from langchain.agents.format_scratchpad.tools import format_to_tool_messages from langchain.agents.output_parsers.tools import ToolsAgentOutputParser @@ -15,12 +15,12 @@ from langchain_core.tools import BaseTool from pydantic import BaseModel, Field +from motleycrew.agents.langchain import LangchainMotleyAgent from motleycrew.applications.customer_support.communication import ( CommunicationInterface, DummyCommunicationInterface, ) from motleycrew.applications.customer_support.issue_tree import IssueData, IssueNode -from motleycrew.agents.langchain import LangchainMotleyAgent from motleycrew.common import LLMFramework from motleycrew.common.exceptions import InvalidOutput from motleycrew.common.llms import init_llm @@ -139,7 +139,6 @@ def __init__(self, context: SupportAgentContext): description="Tool that can send a message to the customer and receive a response. " "Use it if you need to inquire additional details from the customer or to propose a solution.", args_schema=CustomerChatInput, - is_async=True, ) self.context = context @@ -169,23 +168,24 @@ def __init__(self, context: SupportAgentContext): ) self.context = context - def run(self, resolution: Optional[str] = None, escalate: bool = False) -> Tuple[str, bool]: + def run(self, resolution: Optional[str] = None, escalate: bool = False) -> AIMessage: """ Args: resolution: Resolution to the issue. escalate: Whether to escalate the issue to a human agent. Returns: - Tuple[str, bool]: The resolution to the issue and a boolean indicating whether the issue was escalated. + AIMessage: The resolution to the issue with `escalate` value in the additional kwargs. """ if escalate: - return resolution, True + content = f"I am escalating this issue to a human agent. Resolution: {resolution}" + return AIMessage(content=content, additional_kwargs={"escalate": True}) else: if not resolution: raise InvalidOutput("Resolution must be provided when not escalating.") if not self.context.viewed_issues: raise InvalidOutput("You must view at least some past issues before resolving.") - return resolution, False + return AIMessage(content=resolution, additional_kwargs={"escalate": False}) SUPPORT_AGENT_PROMPT = """You are a customer support agent. Your goal is to answer customer's questions. @@ -328,7 +328,7 @@ async def main(): print("Starting Customer Support Agent Demo") - customer_query = "I am having trouble logging in." + customer_query = "I forgot my password." print(f"\nCustomer: {customer_query}") response = await agent.ainvoke( @@ -336,7 +336,17 @@ async def main(): "prompt": customer_query, } ) - print(f"Agent: {response}") + print(f"Agent: {response.content}") + + followup_query = "What if I forgot my email?" + print(f"\nCustomer: {followup_query}") + + response = await agent.ainvoke( + { + "prompt": followup_query, + } + ) + print(f"Agent: {response.content}") print("\nDemo completed.") diff --git a/motleycrew/tools/tool.py b/motleycrew/tools/tool.py index b4e0c850..5e1bfe0f 100644 --- a/motleycrew/tools/tool.py +++ b/motleycrew/tools/tool.py @@ -75,7 +75,6 @@ def __init__( return_direct: bool = False, exceptions_to_reflect: Optional[List[Type[Exception]]] = None, retry_config: Optional[RetryConfig] = None, - is_async: bool = False, ): """Initialize the MotleyTool. @@ -87,10 +86,7 @@ def __init__( return_direct: If True, the tool's output will be returned directly to the user. exceptions_to_reflect: List of exceptions to reflect back to the agent. retry_config: Configuration for retry behavior. If None, exceptions will not be retried. - is_async: Indicates whether the tool is asynchronous. """ - self.is_async = is_async - if tool is None: assert name is not None assert description is not None @@ -106,10 +102,10 @@ def __init__( self.exceptions_to_reflect = [InvalidOutput, *self.exceptions_to_reflect] self.retry_config = retry_config or RetryConfig(max_retries=0, exceptions_to_retry=()) + + self._patch_tool_run() if self.is_async: self._patch_tool_arun() - else: - self._patch_tool_run() self.agent: Optional[MotleyAgentAbstractParent] = None self.agent_input: Optional[dict] = None @@ -135,6 +131,11 @@ def args_schema(self): """Schema of the tool arguments.""" return self.tool.args_schema + @property + def is_async(self): + """Check if the tool is asynchronous.""" + return getattr(self.tool, "coroutine", None) is not None + def _patch_tool_run(self): """Patch the tool run method to implement retry logic and reflect exceptions.""" @@ -214,11 +215,8 @@ async def ainvoke( config: Optional[RunnableConfig] = None, **kwargs: Any, ) -> Any: - if self.is_async: - return await self.tool.ainvoke(input=input, config=config, **kwargs) - else: - # Fallback to synchronous invoke if no async method is available - return await asyncio.to_thread(self.invoke, input, config, **kwargs) + return await self.tool.ainvoke(input=input, config=config, **kwargs) + def invoke( self, @@ -226,8 +224,6 @@ def invoke( config: Optional[RunnableConfig] = None, **kwargs: Any, ) -> Any: - if self.is_async: - raise RuntimeError("Cannot use invoke() with async tools. Use ainvoke() instead.") return self.tool.invoke(input=input, config=config, **kwargs) def run(self, *args, **kwargs): @@ -237,18 +233,28 @@ async def arun(self, *args, **kwargs): pass def _tool_from_run_method(self, name: str, description: str, args_schema: BaseModel): - if self.is_async: - return StructuredTool.from_function( - name=name, - description=description, - args_schema=args_schema, - coroutine=self.arun, - ) - else: - return StructuredTool.from_function( - name=name, description=description, args_schema=args_schema, func=self.run + func = None + coroutine = None + + if self.__class__.run != MotleyTool.run: + func = self.run + if self.__class__.arun != MotleyTool.arun: + coroutine = self.arun + + if func is None and coroutine is None: + raise Exception( + "At least one of run and arun methods must be overridden in MotleyTool if not " + "constructing from a supported tool instance." ) + return StructuredTool.from_function( + name=name, + description=description, + args_schema=args_schema, + func=func, + coroutine=coroutine, + ) + @staticmethod def from_langchain_tool( langchain_tool: BaseTool, @@ -256,13 +262,11 @@ def from_langchain_tool( exceptions_to_reflect: Optional[List[Exception]] = None, retry_config: Optional[RetryConfig] = None, ) -> "MotleyTool": - is_async = getattr(langchain_tool, "coroutine", None) is not None return MotleyTool( tool=langchain_tool, return_direct=return_direct, exceptions_to_reflect=exceptions_to_reflect, retry_config=retry_config, - is_async=is_async, ) @staticmethod