Skip to content

Commit

Permalink
Fixes for async tools & doc updates (#93)
Browse files Browse the repository at this point in the history
  • Loading branch information
whimo authored Oct 17, 2024
1 parent 4b4f046 commit 0f49c45
Show file tree
Hide file tree
Showing 5 changed files with 143 additions and 41 deletions.
3 changes: 0 additions & 3 deletions docs/source/quickstart.nblink

This file was deleted.

87 changes: 87 additions & 0 deletions docs/source/quickstart.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
Quickstart
==========

This is a brief introduction to motleycrew.

For a working example of agents, tools, crew, and SimpleTask, check out the :doc:`blog with images <examples/blog_with_images>`.

For a working example of custom tasks that fully utilize the knowledge graph backend, check out the :doc:`research agent <examples/research_agent>`.

Agents and tools
----------------

Motleycrew provides thin wrappers for all the common agent frameworks: Langchain, LlamaIndex, CrewAI, and Autogen (please let us know if you want any others added!).
It also provides thin wrappers for Langchain and LlamaIndex tools, allowing you to use any of these tools with any of these agents.

MotleyCrew also supports **delegation**: you can simply give any agent as a tool to any other agent.

All the wrappers for tools and agents implement the Runnable interface, so you can use them as-is in LCEL and Langgraph code.

Output handlers (aka return_direct)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^

An **output handler** is a tool that the agent uses for submitting the final output instead of returning it raw. Besides defining a schema for the output, the output handler enables you to implement any validation logic inside it, including agent-based. If your agent returns invalid output, you can raise an exception that will be returned to the agent so that it can retry.

Essentially, an output handler is a tool that returns its output directly to the user, thus finishing the agent execution. This behavior is enabled by setting the ``return_direct=True`` argument for the tool. Unlike other frameworks, MotleyCrew allows to have multiple output handlers for one agent, from which the agent can choose one.

MotleyCrew also supports **forced output handlers**. This means that the agent will only be able to return output via an output handler, and not directly. This is useful if you want to ensure that the agent only returns output in a specific format.

See our usage examples with a :doc:`simple validator <examples/validating_agent_output>` and an :doc:`advanced output handler with multiple fields <examples/advanced_output_handling>`.

MotleyTool
^^^^^^^^^^

A tool in motleycrew, like in other frameworks, is basically a function that takes an input and returns an output.
It is called a tool in the sense that it is usually used by an agent to perform a specific action.
Besides the function itself, a tool also contains an input schema which describes the input format to the LLM.

``MotleyTool`` is the base class for all tools in motleycrew. It is a subclass of ``Runnable`` that adds some additional features to the tool, along with necessary adapters and converters.

If you pass a tool from a supported framework (currently Langchain, LlamaIndex, and CrewAI) to a motleycrew agent, it will be automatically converted. If you want to have control over this, e.g. to customize tool params, you can do it manually.

.. code-block:: python
motley_tool = MotleyTool.from_supported_tool(my_tool)
It is also possible to define a custom tool using the ``MotleyTool`` base class, overriding the ``run`` method. This is especially useful if you want to access context such as the caller agent or its last input, which can be useful for validation.

.. code-block:: python
class MyTool(MotleyTool):
def run(self, some_input: str) -> str:
return f"Received {some_input} from agent {self.agent} with last input {self.agent_input}"
Tools can be executed asynchronously, either directly of via by an asynchronous agent. By default, the async version will just run the sync version in a separate thread

MotleyTool can reflect exceptions that are raised inside it back to the agent, which can then retry the tool call. You can pass a list of exception classes to the ``exceptions_to_reflect`` argument in the constructor (or even pass the ``Exception`` class to reflect everything).

Crew and tasks
--------------

The other two key concepts in motleycrew are crew and tasks. The crew is the orchestrator for tasks, and must be passed to all tasks at creation; tasks can be connected into a DAG using the ``>>`` operator, for example ``TaskA >> TaskB``. This means that ``TaskB`` will not be started before ``TaskA`` is complete, and will be given ``TaskA``'s output.

Once all tasks and their relationships have been set up, it all can be run via ``crew.run()``, which returns a list of the executed ``TaskUnits`` (see below for details).

SimpleTask
^^^^^^^^^^

``SimpleTask`` is a basic implementation of the ``Task`` API. It only requires a crew, a description, and an agent. When it's executed, the description is combined with the output of any upstream tasks and passed on to the agent, and the agent's output is the tasks's output.

For a working illustration of all the concepts so far, see the :doc:`blog with images <examples/blog_with_images>` example.

Knowledge graph backend and custom tasks
----------------------------------------

The functionality so far is convenient, allowing us to mix all the popular agents and tools, but otherwise fairly vanilla, little different from, for example, the CrewAI semantics. Fortunately, the above introduction just scratched the surface of the motleycrew ``Task`` API.

In motleycrew, a task is basically a set of rules describing how to perform actions. It provides a **worker** (e.g. an agent) and sets of input data called **task units**. This allows defining workflows of any complexity concisely using crew semantics. For a deeper dive, check out the page on :doc:`key concepts <key_concepts>`.

The crew queries and dispatches available task units in a loop, managing task states using an embedded :doc:`knowledge graph <knowledge_graph>`.

This dispatch method easily supports different execution backends, from synchronous to asyncio, threaded, etc.

Example: Recursive question-answering in the research agent
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^

Motleycrew architecture described above easily allows to generate task units on the fly, if needed. An example of the power of this approach is the :doc:`research agent <examples/research_agent>` that dynamically generates new questions based on retrieved context for previous questions.
This example also shows how workers can collaborate via the shared knowledge graph, storing all necessary data in a way that is natural to the task.
10 changes: 7 additions & 3 deletions motleycrew/applications/customer_support/ray_serve_app.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,10 @@ async def root(self):
@app.websocket("/ws")
async def websocket_endpoint(self, websocket: WebSocket):
await websocket.accept()
await websocket.send_json(
{"type": "agent_message", "content": "Hello! How can I help you?"}
)

communication_interface = WebSocketCommunicationInterface(websocket)

context = SupportAgentContext(self.graph_store, communication_interface)
Expand All @@ -75,11 +79,11 @@ async def websocket_endpoint(self, websocket: WebSocket):
try:
while True:
data = await websocket.receive_text()
resolution, escalate = await agent.ainvoke({"prompt": data})
if escalate:
resolution = await agent.ainvoke({"prompt": data})
if resolution.additional_kwargs.get("escalate"):
await communication_interface.escalate_to_human_agent()
else:
await communication_interface.resolve_issue(resolution)
await communication_interface.resolve_issue(resolution.content)
except Exception as e:
logger.error(f"WebSocket error: {e}")

Expand Down
30 changes: 20 additions & 10 deletions motleycrew/applications/customer_support/support_agent.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
import asyncio
from dataclasses import dataclass, field
from dotenv import load_dotenv
from pathlib import Path
from typing import List, Optional, Sequence, Tuple
from typing import List, Optional, Sequence

from dotenv import load_dotenv
from langchain.agents import AgentExecutor
from langchain.agents.format_scratchpad.tools import format_to_tool_messages
from langchain.agents.output_parsers.tools import ToolsAgentOutputParser
Expand All @@ -15,12 +15,12 @@
from langchain_core.tools import BaseTool
from pydantic import BaseModel, Field

from motleycrew.agents.langchain import LangchainMotleyAgent
from motleycrew.applications.customer_support.communication import (
CommunicationInterface,
DummyCommunicationInterface,
)
from motleycrew.applications.customer_support.issue_tree import IssueData, IssueNode
from motleycrew.agents.langchain import LangchainMotleyAgent
from motleycrew.common import LLMFramework
from motleycrew.common.exceptions import InvalidOutput
from motleycrew.common.llms import init_llm
Expand Down Expand Up @@ -139,7 +139,6 @@ def __init__(self, context: SupportAgentContext):
description="Tool that can send a message to the customer and receive a response. "
"Use it if you need to inquire additional details from the customer or to propose a solution.",
args_schema=CustomerChatInput,
is_async=True,
)
self.context = context

Expand Down Expand Up @@ -169,23 +168,24 @@ def __init__(self, context: SupportAgentContext):
)
self.context = context

def run(self, resolution: Optional[str] = None, escalate: bool = False) -> Tuple[str, bool]:
def run(self, resolution: Optional[str] = None, escalate: bool = False) -> AIMessage:
"""
Args:
resolution: Resolution to the issue.
escalate: Whether to escalate the issue to a human agent.
Returns:
Tuple[str, bool]: The resolution to the issue and a boolean indicating whether the issue was escalated.
AIMessage: The resolution to the issue with `escalate` value in the additional kwargs.
"""
if escalate:
return resolution, True
content = f"I am escalating this issue to a human agent. Resolution: {resolution}"
return AIMessage(content=content, additional_kwargs={"escalate": True})
else:
if not resolution:
raise InvalidOutput("Resolution must be provided when not escalating.")
if not self.context.viewed_issues:
raise InvalidOutput("You must view at least some past issues before resolving.")
return resolution, False
return AIMessage(content=resolution, additional_kwargs={"escalate": False})


SUPPORT_AGENT_PROMPT = """You are a customer support agent. Your goal is to answer customer's questions.
Expand Down Expand Up @@ -328,15 +328,25 @@ async def main():

print("Starting Customer Support Agent Demo")

customer_query = "I am having trouble logging in."
customer_query = "I forgot my password."
print(f"\nCustomer: {customer_query}")

response = await agent.ainvoke(
{
"prompt": customer_query,
}
)
print(f"Agent: {response}")
print(f"Agent: {response.content}")

followup_query = "What if I forgot my email?"
print(f"\nCustomer: {followup_query}")

response = await agent.ainvoke(
{
"prompt": followup_query,
}
)
print(f"Agent: {response.content}")

print("\nDemo completed.")

Expand Down
54 changes: 29 additions & 25 deletions motleycrew/tools/tool.py
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,6 @@ def __init__(
return_direct: bool = False,
exceptions_to_reflect: Optional[List[Type[Exception]]] = None,
retry_config: Optional[RetryConfig] = None,
is_async: bool = False,
):
"""Initialize the MotleyTool.
Expand All @@ -87,10 +86,7 @@ def __init__(
return_direct: If True, the tool's output will be returned directly to the user.
exceptions_to_reflect: List of exceptions to reflect back to the agent.
retry_config: Configuration for retry behavior. If None, exceptions will not be retried.
is_async: Indicates whether the tool is asynchronous.
"""
self.is_async = is_async

if tool is None:
assert name is not None
assert description is not None
Expand All @@ -106,10 +102,10 @@ def __init__(
self.exceptions_to_reflect = [InvalidOutput, *self.exceptions_to_reflect]

self.retry_config = retry_config or RetryConfig(max_retries=0, exceptions_to_retry=())

self._patch_tool_run()
if self.is_async:
self._patch_tool_arun()
else:
self._patch_tool_run()

self.agent: Optional[MotleyAgentAbstractParent] = None
self.agent_input: Optional[dict] = None
Expand All @@ -135,6 +131,11 @@ def args_schema(self):
"""Schema of the tool arguments."""
return self.tool.args_schema

@property
def is_async(self):
"""Check if the tool is asynchronous."""
return getattr(self.tool, "coroutine", None) is not None

def _patch_tool_run(self):
"""Patch the tool run method to implement retry logic and reflect exceptions."""

Expand Down Expand Up @@ -214,20 +215,15 @@ async def ainvoke(
config: Optional[RunnableConfig] = None,
**kwargs: Any,
) -> Any:
if self.is_async:
return await self.tool.ainvoke(input=input, config=config, **kwargs)
else:
# Fallback to synchronous invoke if no async method is available
return await asyncio.to_thread(self.invoke, input, config, **kwargs)
return await self.tool.ainvoke(input=input, config=config, **kwargs)


def invoke(
self,
input: Union[str, Dict],
config: Optional[RunnableConfig] = None,
**kwargs: Any,
) -> Any:
if self.is_async:
raise RuntimeError("Cannot use invoke() with async tools. Use ainvoke() instead.")
return self.tool.invoke(input=input, config=config, **kwargs)

def run(self, *args, **kwargs):
Expand All @@ -237,32 +233,40 @@ async def arun(self, *args, **kwargs):
pass

def _tool_from_run_method(self, name: str, description: str, args_schema: BaseModel):
if self.is_async:
return StructuredTool.from_function(
name=name,
description=description,
args_schema=args_schema,
coroutine=self.arun,
)
else:
return StructuredTool.from_function(
name=name, description=description, args_schema=args_schema, func=self.run
func = None
coroutine = None

if self.__class__.run != MotleyTool.run:
func = self.run
if self.__class__.arun != MotleyTool.arun:
coroutine = self.arun

if func is None and coroutine is None:
raise Exception(
"At least one of run and arun methods must be overridden in MotleyTool if not "
"constructing from a supported tool instance."
)

return StructuredTool.from_function(
name=name,
description=description,
args_schema=args_schema,
func=func,
coroutine=coroutine,
)

@staticmethod
def from_langchain_tool(
langchain_tool: BaseTool,
return_direct: bool = False,
exceptions_to_reflect: Optional[List[Exception]] = None,
retry_config: Optional[RetryConfig] = None,
) -> "MotleyTool":
is_async = getattr(langchain_tool, "coroutine", None) is not None
return MotleyTool(
tool=langchain_tool,
return_direct=return_direct,
exceptions_to_reflect=exceptions_to_reflect,
retry_config=retry_config,
is_async=is_async,
)

@staticmethod
Expand Down

0 comments on commit 0f49c45

Please sign in to comment.