From 01fe9400ce5c02b1443b4807e67bd2f6c9a449b9 Mon Sep 17 00:00:00 2001 From: zyzhang1130 <36942574+zyzhang1130@users.noreply.github.com> Date: Wed, 20 Nov 2024 10:15:03 +0800 Subject: [PATCH] update --- examples/data_interpreter/di_agents.py | 52 ++- examples/data_interpreter/di_multiagent.py | 411 +++++++++++---------- 2 files changed, 269 insertions(+), 194 deletions(-) diff --git a/examples/data_interpreter/di_agents.py b/examples/data_interpreter/di_agents.py index 2ba9b5613..1a0dfe984 100644 --- a/examples/data_interpreter/di_agents.py +++ b/examples/data_interpreter/di_agents.py @@ -204,7 +204,7 @@ def reply(self, x: Optional[Union[Msg, Sequence[Msg]]] = None) -> Msg: - If you think code or tools are helpful for verification, use `execute_python_code` and/or other tools available to do verification. - Do not simply trust the claim in `result`. VERIFY IT. - If the information in `result` cannot solve `current_sub_task`, Do NOT attempt to fix it. Report it IMMEDIATELY. You job is just to do the verification. - - If the given result can succesfully solve `current_sub_task`, ALWAYS output 'True' at the very end of your response; otherwise, explain why the given result cannot succesfully solve `current_sub_task` and output 'False'. + - If the given result can succesfully solve `current_sub_task`, ALWAYS output 'True' at the very end of your response; otherwise, output why the given result cannot succesfully solve `current_sub_task` and followed by 'False'. - DO NOT call `finish` before the entire verification process is completed. After the entire verification is completed, use `finish` tool IMMEDIATELY.""" msg = Msg( @@ -350,3 +350,53 @@ def _replanning(self, task: str) -> List[Dict[str, Any]]: parser = MarkdownJsonObjectParser() parsed_response: List[Dict[str, Any]] = parser.parse(response) return parsed_response.parsed + + def _decompose_task( + self, + task: str, + max_tasks: int = 5, + ) -> List[Dict[str, Any]]: + """ + Decompose a complex subtask into smaller, more manageable subtasks. + + Args: + task (str): The task to be decomposed + max_tasks (int, optional): Maximum number of subtasks allowed. Defaults to 5. + + Returns: + List[Dict[str, Any]]: List of decomposed subtasks as dictionaries + """ + message = [ + { + "role": "user", + "content": f""" + Task: {task} + - Given the task above which was determined to be too complex, break it down into smaller, more manageable subtasks. + - Every subtask should be solvable through either executing code or using tools. The information of all the tools available are here: + {self.service_toolkit.tools_instruction} + - The subtask should not be too simple. If a task can be solved with a single block of code in one go, it should not be broken down further. + - Prioritize using other tools over `execute_python_code` and take the tools available into consideration when decomposing the task. + - Provide a JSON structure with the following format for the decomposition: + ```json + [ + {{ + "task_id": str = "unique identifier for a task in plan, can be an ordinal", + "dependent_task_ids": list[str] = "ids of tasks prerequisite to this task", + "instruction": "what you should do in this task, one short phrase or sentence", + "task_type": "type of this task, should be one of Available Task Types", + "task_type": "type of this task, should be one of Available Task Types", + "tool_info": "recommended tool(s)' name(s) for solving this task", + }}, + ... + ] + ``` + - The maximum number of subtasks allowed is {max_tasks}. + """, + }, + ] + + response_text: str = self.model(message).text.strip() + response = ModelResponse(text=response_text) + parser = MarkdownJsonObjectParser() + parsed_response: List[Dict[str, Any]] = parser.parse(response) + return parsed_response.parsed diff --git a/examples/data_interpreter/di_multiagent.py b/examples/data_interpreter/di_multiagent.py index 248f901f4..5759c4c11 100644 --- a/examples/data_interpreter/di_multiagent.py +++ b/examples/data_interpreter/di_multiagent.py @@ -6,6 +6,7 @@ """ import csv import os +import copy from typing import Any, List, Dict, Optional from dotenv import load_dotenv, find_dotenv from di_agents import ( @@ -28,6 +29,17 @@ from agentscope.service.service_response import ServiceResponse from agentscope.service.service_status import ServiceExecStatus +# Global variables for agents with type annotations +planner_agent: PlannerAgent +solver_agent: ReActAgent +verifier_agent: VerifierAgent +synthesizer_agent: SynthesizerAgent +replanner_agent: ReplanningAgent + +# Global variables for failure tracking +total_failure_count = 0 +max_total_failures = 3 # Adjust as needed + def read_csv_file(file_path: str) -> ServiceResponse: """ @@ -107,58 +119,55 @@ def write_csv_file( os.environ["ANTHROPIC_API_KEY"] = os.getenv("ANTHROPIC_API_KEY") -STRUCTUAL_PROMPT = """ - # overall_task: {overall_task} +STRUCTURAL_PROMPT = """ + # overall_task: {overall_task} - # solved_sub_tasks: {solved_sub_tasks} + # solved_sub_tasks: {solved_sub_tasks} - # current_sub_task: {current_sub_task} + # current_sub_task: {current_sub_task} - # Instruction - - Conditioning on `overall_task` and `solved_sub_tasks`, solve `current_sub_task` with the appropriate tools provided. Note that you should only use `overall_task` and `solved_sub_tasks` as context as opposed to solving them. DO NOT attempt to solve `overall_task` or `solved_sub_tasks`. - - When using tools, ALWAYS prioritize using the tool mentioned in `tool_info` over other tool or code for solving `current_sub_task`. - - While some concise thoughts are helpful, code is required, unless other tools are used. If certain python libraries are not installed, use `execute_shell_command` to install them. - - At each step, if some data is fetched/generated, it is a good practice to save it. + # Instruction + - Conditioning on `overall_task` and `solved_sub_tasks`, solve `current_sub_task` with the appropriate tools provided. Note that you should only use `overall_task` and `solved_sub_tasks` as context as opposed to solving them. DO NOT attempt to solve `overall_task` or `solved_sub_tasks`. + - When using tools, ALWAYS prioritize using the tool mentioned in `tool_info` over other tool or code for solving `current_sub_task`. + - While some concise thoughts are helpful, code is required, unless other tools are used. If certain python libraries are not installed, use `execute_shell_command` to install them. + - At each step, if some data is fetched/generated, it is a good practice to save it. - # Output Instruction - - Always output one and only one code block in your response. The code block must be self-contained, i.e., does not rely on previously generated code to be executed successfully, because the execution environments do not persist between calls of `execute_python_code`. Always use print statement on the final solution. E.g., if `res` is the final output, use `print(res)` at the end of your code. Output the code itself if the task at hand is to generate that code. After that, use `execute_python_code` to execute your code. Based on the result from code execution or tool using, determine if `current_sub_task` is solved. - - After `current_sub_task` is solved, return explicitly the result(s) for `current_sub_task` that is/are needed in the subsequent subtasks. If certain code are needed for the subsequent tasks, OUTPUT THE COMPLETE CODE. If the code is long, save it in txt or json format, and output the path for next round's use. If the result(s) contain(s) a lof of data, save the result(s) locally, output the path before proceed. - - DO NOT USE `finish` tool before executing the code if code execution is required. If the result involves a lot of data, save the data as csv, txt, json file(s) etc. for the ease of processing in the following subtasks. - """ + # Output Instruction + - Always output one and only one code block in your response. The code block must be self-contained, i.e., does not rely on previously generated code to be executed successfully, because the execution environments do not persist between calls of `execute_python_code`. Always use print statement on the final solution. E.g., if `res` is the final output, use `print(res)` at the end of your code. Output the code itself if the task at hand is to generate that code. After that, use `execute_python_code` to execute your code. Based on the result from code execution or tool using, determine if `current_sub_task` is solved. + - After `current_sub_task` is solved, return explicitly the result(s) for `current_sub_task` that is/are needed in the subsequent subtasks. If certain code are needed for the subsequent tasks, OUTPUT THE COMPLETE CODE. If the code is long, save it in txt or json format, and output the path for next round's use. If the result(s) contain(s) a lot of data, save the result(s) locally, output the path before proceed. + - DO NOT USE `finish` tool before executing the code if code execution is required. If the result involves a lot of data, save the data as csv, txt, json file(s) etc. for the ease of processing in the following subtasks. + """ -def problem_solving(task: str) -> str: +def process_subtasks( + subtasks: List[Dict[str, Any]], + task: str, + solved_dependent_sub_tasks: str, +) -> str: """ - Solves a given complex task by decomposing it into subtasks, planning, - solving, verifying, and synthesizing the final answer. + Process and solve subtasks recursively while handling failures and replanning when necessary. + This function implements a robust task-solving pipeline that includes verification, + failure tracking, and dynamic replanning capabilities. Args: - task (str): The overall task description to solve. + subtasks (List[Dict[str, Any]]): List of subtasks to be processed, where each subtask + is a dictionary containing task instructions and metadata. + task (str): The overall task description that provides context for subtask processing. + solved_dependent_sub_tasks (str): String containing the accumulated results and context + from previously solved subtasks. Returns: - str: The final synthesized answer to the overall task. - - This function orchestrates the problem-solving process by: - - Using the planner agent to decompose the task into manageable subtasks. - - Iteratively processing each subtask: - - Solving the subtask using the solver agent. - - Verifying the solution with the verifier agent. - - If verification fails, invoking the replanning agent - to adjust the plan. - - Once all subtasks are successfully solved, synthesizing the results - using the synthesizer agent to produce the final answer. + str: The final synthesized answer after processing all subtasks, incorporating + the results from successful subtask executions and any necessary replanning. """ - - task_msg: Msg = Msg(name="Planner", role="system", content=task) - subtasks: List[Dict[str, Any]] = planner_agent(task_msg) - solved_dependent_sub_tasks: str = "" - + global total_failure_count, max_total_failures, replanner_agent subtask_index: int = 0 + aggregated_result: str = "" while subtask_index < len(subtasks): - print("current subtask:", subtasks[subtask_index]["instruction"]) + print("current subtask:", subtasks[subtask_index]) if subtask_index > 0: solved_dependent_sub_tasks += str(subtasks[subtask_index - 1]) - prompt: str = STRUCTUAL_PROMPT.format( + prompt: str = STRUCTURAL_PROMPT.format( overall_task=task, solved_sub_tasks=solved_dependent_sub_tasks, current_sub_task=subtasks[subtask_index], @@ -167,7 +176,7 @@ def problem_solving(task: str) -> str: verdict: str = "non" failure_count: int = 0 - max_failure: int = 1 # Adjust as needed + max_failure: int = 3 # Adjust as needed result: Optional[Msg] = None while "True" not in verdict[-5:]: if verdict != "non": @@ -177,11 +186,18 @@ def problem_solving(task: str) -> str: content=prompt + " VERDICT: " + verdict, ) failure_count += 1 + total_failure_count += 1 + if failure_count > max_failure: - # Call the replanning agent - result_content: str = ( - result.content if "result" in locals() else "" - ) + # Check if total failures exceed max_total_failures + if total_failure_count > max_total_failures: + print("Exceeded maximum total failures. Aborting.") + return ( + "Failed to solve the task due to excessive failures." + ) + + # Call the replanner agent + result_content: str = str(result.content) if result else "" msg_replan: Msg = Msg( name="replanner", role="system", @@ -202,36 +218,35 @@ def problem_solving(task: str) -> str: ) decision: str output: Any - decision, output = replanning_agent(msg_replan) + decision, output = replanner_agent(msg_replan) if decision == "decompose_subtask": - # Recursively solve the decomposed sub-subtasks - print( - "Decomposing current subtask into sub-subtasks...", - ) - subtask_instruction: str = subtasks[subtask_index][ - "instruction" - ] - # Call problem_solving recursively - subtask_result: str = problem_solving( - subtask_instruction, + # Decompose current subtask into sub-subtasks + print("Decomposing current subtask into sub-subtasks...") + # Recursively process the new subtasks + final_answer = process_subtasks( + output, + task, + solved_dependent_sub_tasks, ) - # Store the result - subtasks[subtask_index]["result"] = subtask_result - # Increment subtask_index to move to the next subtask + # After processing sub-subtasks, set the result of current subtask + subtasks[subtask_index]["result"] = final_answer + aggregated_result += final_answer subtask_index += 1 - # Break out of the failure loop - break + break # Break the while loop if decision == "replan_subtask": # Update subtasks with the new plan print("Replanning current and subsequent subtasks...") # Replace current and subsequent subtasks - subtasks = subtasks[:subtask_index] + output - # subtask_index remains the same to retry solving the current subtask - # Break out to restart processing with the new plan - break + # subtasks = subtasks[:subtask_index] + output + subtasks = copy.deepcopy(output) + # Reset failure_count + failure_count = 0 + # Continue with the updated subtasks + break # Break and restart the while loop with new subtasks raise ValueError( "Unknown decision from replanning_agent.", ) + # Proceed with solving the subtask result = solver_agent(msg) msg_verifier: Msg = Msg( @@ -245,168 +260,178 @@ def problem_solving(task: str) -> str: + "\ncurrent_sub_task: " + subtasks[subtask_index]["instruction"] + "\nresult: " - + result.content + + str(result.content) ), ) verdict = verifier_agent(msg_verifier).content # Store the result if verification passed if "True" in verdict[-5:]: - subtasks[subtask_index]["result"] = result.content + subtasks[subtask_index]["result"] = str(result.content) + aggregated_result += str(result.content) subtask_index += 1 # Move to the next subtask - else: - # Handle the case where verification never passed - print("Unable to solve subtask after replanning.") - break # Exit if unable to solve even after replanning - - # Once all subtasks are solved, synthesize the final answer + # Reset failure_count after a successful subtask + failure_count = 0 + # else: + # # Handle the case where verification never passed + # print("Unable to solve subtask after replanning.") + # return "Failed to solve the subtask after replanning." + + # Once all subtasks are processed, synthesize the final answer msg_synthesizer: Msg = Msg( name="synthesizer", role="system", content="overall_task: " + task + "\nsubtasks: " + str(subtasks), ) - final_answer: str = synthesizer_agent(msg_synthesizer).content + final_answer = synthesizer_agent(msg_synthesizer).content return final_answer -agentscope.init( - model_configs=[ - { - "config_name": "gpt_config", - "model_type": "openai_chat", - # "model_name": "chatgpt-4o-latest", - # "model_name": "gpt-4o-mini", - "model_name": "o1-mini", - "api_key": openai_api_key, - # "generate_args": { - # "temperature": 0.0, - # }, - }, - { - "config_name": "dashscope", - "model_type": "dashscope_chat", - "model_name": "qwen-max-1201", - "api_key": dashscope_api_key, - "generate_args": { - "temperature": 0.0, - }, - }, - { - "config_name": "lite_llm_claude", - "model_type": "litellm_chat", - "model_name": "claude-3-5-sonnet-20240620", - "generate_args": { - # "max_tokens": 4096, - "temperature": 0.0, - }, - }, - { - "model_type": "post_api_chat", - "config_name": "my_post_api", - "api_url": "https://xxx", - "headers": {}, - }, - ], - project="Multi-Agent Conversation", - save_api_invoke=True, -) - -# Create a ServiceToolkit instance -service_toolkit = ServiceToolkit() -# Add your tools to the service_toolkit here if needed -service_toolkit.add( - execute_python_code, -) -service_toolkit.add( - list_directory_content, -) -service_toolkit.add( - get_current_directory, -) -service_toolkit.add( - execute_shell_command, -) +def problem_solving(task: str) -> str: + """ + Solve the given task by planning, processing subtasks, and synthesizing the final answer. -# Init the DataInterpreterAgent -planner_agent = PlannerAgent( - name="planner", - sys_prompt="You're a helpful assistant.", - model_config_name="lite_llm_claude", - service_toolkit=service_toolkit, -) + Args: + task (str): The task description to be solved. -solver_agent = ReActAgent( - name="solver", - sys_prompt="You're a helpful assistant.", - model_config_name="lite_llm_claude", - service_toolkit=service_toolkit, -) + Returns: + str: The final solution to the task. + """ + global total_failure_count, max_total_failures + total_failure_count = 0 + max_total_failures = 10 # Adjust as needed -verifier_agent = VerifierAgent( - name="verifier", - sys_prompt="You're a helpful assistant.", - model_config_name="lite_llm_claude", - service_toolkit=service_toolkit, -) + task_msg: Msg = Msg(name="Planner", role="system", content=task) + subtasks: List[Dict[str, Any]] = planner_agent(task_msg) + solved_dependent_sub_tasks: str = "" -synthesizer_agent = SynthesizerAgent( - name="synthesizer", - sys_prompt="You're a helpful assistant.", - model_config_name="lite_llm_claude", -) + final_answer = process_subtasks(subtasks, task, solved_dependent_sub_tasks) -replanning_agent = ReplanningAgent( - name="reviser", - sys_prompt="You're a helpful assistant.", - model_config_name="lite_llm_claude", - service_toolkit=service_toolkit, -) + return final_answer -input_task = "Solve this math problem: The greatest common divisor of positive integers m and n is 6. The least common multiple of m and n is 126. What is the least possible value of m + n?" -# template = "https://arxiv.org/list/{tag}/pastweek?skip=0&show=50" -# tags = ["cs.ai", "cs.cl", "cs.ls", "cs.se"] -# # tags = ["cs.AI"] -# urls = [template.format(tag=tag) for tag in tags] -# task = f"""This is a collection of arxiv urls: '{urls}' . -# Record each article, remove duplicates by title (they may have multiple tags), filter out papers related to -# large language model / agent / llm, print top 10 and visualize the word count of the titles""" +def init_agents() -> None: + """ + Initialize all agents with the required configurations. + """ + global planner_agent, solver_agent, verifier_agent, synthesizer_agent, replanner_agent + + agentscope.init( + model_configs=[ + { + "config_name": "gpt_config", + "model_type": "openai_chat", + # "model_name": "chatgpt-4o-latest", + "model_name": "gpt-4o-mini", + # "model_name": "gpt-4o", + # "model_name": "o1-mini", + "api_key": openai_api_key, + "generate_args": { + "temperature": 0.0, + }, + }, + { + "config_name": "dashscope", + "model_type": "dashscope_chat", + "model_name": "qwen-max-1201", + "api_key": dashscope_api_key, + "generate_args": { + "temperature": 0.0, + }, + }, + { + "config_name": "lite_llm_claude", + "model_type": "litellm_chat", + "model_name": "claude-3-5-haiku-20241022", + # "model_name": "claude-3-5-sonnet-20241022", + "generate_args": { + # "max_tokens": 4096, + "temperature": 0.0, + }, + }, + { + "model_type": "post_api_chat", + "config_name": "my_post_api", + "api_url": "https://xxx", + "headers": {}, + }, + { + "config_name": "dashscope_chat", + "model_type": "dashscope_chat", + "model_name": "qwen-max", + "api_key": "sk-94d038e92230451a87ac37ac34dd6a8a", + "generate_args": { + "temperature": 0.7, + }, + }, + ], + project="Multi-Agent Conversation", + save_api_invoke=True, + use_monitor=True, # Enable token usage monitoring + ) -# sd_url = "http://your.sd.service.ip:port" -# task = ( -# f"I want to generate an image of a beautiful girl using the stable diffusion text2image tool, sd_url={sd_url}" -# ) + # Create a ServiceToolkit instance + service_toolkit = ServiceToolkit() + # Add your tools to the service_toolkit here if needed + service_toolkit.add( + execute_python_code, + ) + service_toolkit.add( + list_directory_content, + ) + service_toolkit.add( + get_current_directory, + ) + service_toolkit.add( + execute_shell_command, + ) -# task = "Create a Snake game. Players need to control the movement of the snake to eat food and grow its body, while avoiding the snake's head touching their own body or game boundaries. Games need to have basic game logic, user interface. During the production process, please consider factors such as playability, beautiful interface, and convenient operation of the game. Note: pyxel environment already satisfied" + # Initialize the agents + planner_agent = PlannerAgent( + name="planner", + sys_prompt="You're a helpful assistant.", + model_config_name="dashscope_chat", + service_toolkit=service_toolkit, + ) -# task = """ -# Get products data from website https://scrapeme.live/shop/ and save it as a csv file. -# **Notice: Firstly parse the web page encoding and the text HTML structure; -# The first page product name, price, product URL, and image URL must be saved in the csv;** -# """ -# task = """" -# Get data from `paperlist` table in https://papercopilot.com/statistics/iclr-statistics/iclr-2024-statistics/, -# and save it to a csv file. paper title must include `multiagent` or `large language model`. *notice: print key variables* -# Don't fetch too much data at a time due to context window size.""" + solver_agent = ReActAgent( + name="solver", + sys_prompt="You're a helpful assistant.", + model_config_name="dashscope_chat", + service_toolkit=service_toolkit, + ) -# task = "Run data analysis on sklearn Iris dataset, include a plot" + verifier_agent = VerifierAgent( + name="verifier", + sys_prompt="You're a helpful assistant.", + model_config_name="dashscope_chat", + service_toolkit=service_toolkit, + ) -# WINE_REQ = "Run data analysis on sklearn Wine recognition dataset, include a plot, and train a model to predict wine class (20% as validation), and show validation accuracy." + synthesizer_agent = SynthesizerAgent( + name="synthesizer", + sys_prompt="You're a helpful assistant.", + model_config_name="dashscope_chat", + ) -# DATA_DIR = "path/to/your/data" -# # sales_forecast data from https://www.kaggle.com/datasets/aslanahmedov/walmart-sales-forecast/data -# SALES_FORECAST_REQ = f"""Train a model to predict sales for each department in every store (split the last 40 weeks records as validation dataset, the others is train dataset), include plot total sales trends, print metric and plot scatter plots of -# groud truth and predictions on validation data. Dataset is {DATA_DIR}/train.csv, the metric is weighted mean absolute error (WMAE) for test data. Notice: *print* key variables to get more information for next task step. -# """ + replanner_agent = ReplanningAgent( + name="replanner", + sys_prompt="You're a helpful assistant.", + model_config_name="dashscope_chat", + service_toolkit=service_toolkit, + ) -# REQUIREMENTS = {"wine": WINE_REQ, "sales_forecast": SALES_FORECAST_REQ} -# task = REQUIREMENTS["wine"] +def main() -> None: + """Initialize agents and run an example task through the problem-solving pipeline.""" + # Initialize agents + init_agents() -# task = "This is a titanic passenger survival dataset, your goal is to predict passenger survival outcome. The target column is Survived. Perform data analysis, data preprocessing, feature engineering, and modeling to predict the target. Report accuracy on the eval data. Train data path: '/Users/zhangzeyu/Documents/agentscope/04_titanic/split_train.csv', eval data path: '/Users/zhangzeyu/Documents/agentscope/04_titanic/split_eval.csv'." + # Example task (you can replace this with any task) + input_task = "Your task description here." + final_solution = problem_solving(input_task) + print("final solution: ", final_solution) -# task = "Create a Snake game. Players need to control the movement of the snake to eat food and grow its body, while avoiding the snake's head touching their own body or game boundaries. Games need to have basic game logic, user interface. During the production process, please consider factors such as playability, beautiful interface, and convenient operation of the game. Note: pyxel environment already satisfied" -# task = "Get products data from website https://scrapeme.live/shop/ and save it as a csv file. Notice: Firstly parse the web page encoding and the text HTML structure; The first page product name, price, product URL, and image URL must be saved in the csv;" -final_solution = problem_solving(input_task) -print("final solution: ", final_solution) +if __name__ == "__main__": + main()