From 3dc5731d61f1e76b0e7049b0390cd1a78ecf9c30 Mon Sep 17 00:00:00 2001 From: Alan Date: Fri, 20 Dec 2024 17:52:25 +0800 Subject: [PATCH] update video understanding example with DnC operator --- .gitignore | 1 + .../general_dnc/agent/conclude/conclude.py | 26 ++++--- .../agent/{conqueror => conclude}/__init__.py | 0 .../agent/conclude/conclude.py | 75 +++++++++++++++++++ .../agent/conclude/sys_prompt.prompt | 13 ++++ .../agent/conclude/user_prompt.prompt | 7 ++ .../agent/divider/__init__.py | 0 .../video_preprocessor/video_preprocess.py | 2 +- .../video_understanding/agent/video_qa/qa.py | 4 +- .../configs/llms/text_res_stream.yml | 7 ++ .../configs/workers/conclude.yml | 2 +- .../configs/workers/dnc_workflow.yml | 18 +++++ .../configs/workers/task_conqueror.yml | 5 -- .../configs/workers/task_divider.yml | 5 -- .../configs/workers/task_exit_monitor.yml | 1 - .../configs/workers/task_rescue.yml | 5 -- .../video_understanding_workflow_diagram.png | 4 +- examples/video_understanding/run_cli.py | 40 +++------- omagent-core/src/omagent_core/clients/base.py | 8 +- 19 files changed, 155 insertions(+), 68 deletions(-) rename examples/video_understanding/agent/{conqueror => conclude}/__init__.py (100%) mode change 100644 => 100755 create mode 100644 examples/video_understanding/agent/conclude/conclude.py create mode 100644 examples/video_understanding/agent/conclude/sys_prompt.prompt create mode 100644 examples/video_understanding/agent/conclude/user_prompt.prompt delete mode 100644 examples/video_understanding/agent/divider/__init__.py create mode 100755 examples/video_understanding/configs/llms/text_res_stream.yml create mode 100755 examples/video_understanding/configs/workers/dnc_workflow.yml delete mode 100755 examples/video_understanding/configs/workers/task_conqueror.yml delete mode 100755 examples/video_understanding/configs/workers/task_divider.yml delete mode 100755 examples/video_understanding/configs/workers/task_exit_monitor.yml delete mode 100755 examples/video_understanding/configs/workers/task_rescue.yml diff --git a/.gitignore b/.gitignore index 927deb5..de03502 100644 --- a/.gitignore +++ b/.gitignore @@ -150,6 +150,7 @@ tests data/ tests/mathvista running_logs/ +video_cache/ *.db # vscode diff --git a/examples/general_dnc/agent/conclude/conclude.py b/examples/general_dnc/agent/conclude/conclude.py index 48efd51..e199938 100644 --- a/examples/general_dnc/agent/conclude/conclude.py +++ b/examples/general_dnc/agent/conclude/conclude.py @@ -9,6 +9,7 @@ from pydantic import Field from omagent_core.advanced_components.workflow.dnc.schemas.dnc_structure import TaskTree from omagent_core.utils.logger import logging +from openai import Stream CURRENT_PATH = root_path = Path(__file__).parents[0] @@ -56,16 +57,19 @@ def _run(self, dnc_structure: dict, last_output: str, *args, **kwargs): result=str(last_output), img_placeholders="".join(list(self.stm(self.workflow_instance_id).get('image_cache', {}).keys())), ) - last_output = 'Answer: ' - self.callback.send_incomplete(agent_id=self.workflow_instance_id, msg='Answer: ') - for chunk in chat_complete_res: - if chunk.choices[0].delta.content is not None: - self.callback.send_incomplete(agent_id=self.workflow_instance_id, msg=f'{chunk.choices[0].delta.content}') - last_output += chunk.choices[0].delta.content - else: - self.callback.send_block(agent_id=self.workflow_instance_id, msg='') - last_output += '' - break - # self.callback.send_answer(agent_id=self.workflow_instance_id, msg=f'Answer: {chat_complete_res["choices"][0]["message"]["content"]}') + if isinstance(chat_complete_res, Stream): + last_output = 'Answer: ' + self.callback.send_incomplete(agent_id=self.workflow_instance_id, msg='Answer: ') + for chunk in chat_complete_res: + if chunk.choices[0].delta.content is not None: + self.callback.send_incomplete(agent_id=self.workflow_instance_id, msg=f'{chunk.choices[0].delta.content}') + last_output += chunk.choices[0].delta.content + else: + self.callback.send_block(agent_id=self.workflow_instance_id, msg='') + last_output += '' + break + else: + last_output = chat_complete_res["choices"][0]["message"]["content"] + self.callback.send_answer(agent_id=self.workflow_instance_id, msg=f'Answer: {chat_complete_res["choices"][0]["message"]["content"]}') self.stm(self.workflow_instance_id).clear() return {'last_output': last_output} \ No newline at end of file diff --git a/examples/video_understanding/agent/conqueror/__init__.py b/examples/video_understanding/agent/conclude/__init__.py old mode 100644 new mode 100755 similarity index 100% rename from examples/video_understanding/agent/conqueror/__init__.py rename to examples/video_understanding/agent/conclude/__init__.py diff --git a/examples/video_understanding/agent/conclude/conclude.py b/examples/video_understanding/agent/conclude/conclude.py new file mode 100644 index 0000000..e199938 --- /dev/null +++ b/examples/video_understanding/agent/conclude/conclude.py @@ -0,0 +1,75 @@ +from pathlib import Path +from typing import List + +from omagent_core.models.llms.base import BaseLLMBackend +from omagent_core.engine.worker.base import BaseWorker +from omagent_core.models.llms.prompt import PromptTemplate +from omagent_core.memories.ltms.ltm import LTM +from omagent_core.utils.registry import registry +from pydantic import Field +from omagent_core.advanced_components.workflow.dnc.schemas.dnc_structure import TaskTree +from omagent_core.utils.logger import logging +from openai import Stream + + +CURRENT_PATH = root_path = Path(__file__).parents[0] + + +@registry.register_worker() +class Conclude(BaseLLMBackend, BaseWorker): + prompts: List[PromptTemplate] = Field( + default=[ + PromptTemplate.from_file( + CURRENT_PATH.joinpath("sys_prompt.prompt"), role="system" + ), + PromptTemplate.from_file( + CURRENT_PATH.joinpath("user_prompt.prompt"), role="user" + ), + ] + ) + + def _run(self, dnc_structure: dict, last_output: str, *args, **kwargs): + """A conclude node that summarizes and completes the root task. + + This component acts as the final node that: + - Takes the root task and its execution results + - Generates a final conclusion/summary of the entire task execution + - Formats and presents the final output in a clear way + - Cleans up any temporary state/memory used during execution + + The conclude node is responsible for providing a coherent final response that + addresses the original root task objective based on all the work done by + previous nodes. + + Args: + agent_task (dict): The task tree containing the root task and results + last_output (str): The final output from previous task execution + *args: Additional arguments + **kwargs: Additional keyword arguments + + Returns: + dict: Final response containing the conclusion/summary + """ + task = TaskTree(**dnc_structure) + self.callback.info(agent_id=self.workflow_instance_id, progress=f'Conclude', message=f'{task.get_current_node().task}') + chat_complete_res = self.simple_infer( + task=task.get_root().task, + result=str(last_output), + img_placeholders="".join(list(self.stm(self.workflow_instance_id).get('image_cache', {}).keys())), + ) + if isinstance(chat_complete_res, Stream): + last_output = 'Answer: ' + self.callback.send_incomplete(agent_id=self.workflow_instance_id, msg='Answer: ') + for chunk in chat_complete_res: + if chunk.choices[0].delta.content is not None: + self.callback.send_incomplete(agent_id=self.workflow_instance_id, msg=f'{chunk.choices[0].delta.content}') + last_output += chunk.choices[0].delta.content + else: + self.callback.send_block(agent_id=self.workflow_instance_id, msg='') + last_output += '' + break + else: + last_output = chat_complete_res["choices"][0]["message"]["content"] + self.callback.send_answer(agent_id=self.workflow_instance_id, msg=f'Answer: {chat_complete_res["choices"][0]["message"]["content"]}') + self.stm(self.workflow_instance_id).clear() + return {'last_output': last_output} \ No newline at end of file diff --git a/examples/video_understanding/agent/conclude/sys_prompt.prompt b/examples/video_understanding/agent/conclude/sys_prompt.prompt new file mode 100644 index 0000000..f28aef3 --- /dev/null +++ b/examples/video_understanding/agent/conclude/sys_prompt.prompt @@ -0,0 +1,13 @@ +As the final stage of our task processing workflow, your role is to inform the user about the final execution result of the task. +Your task includes two parts: +1. Verify the result, ensure it is a valid result of the user's question or task. +2. Image may be visual prompted by adding bound boxes and labels to the image, this is the important information. +3. Generate the output message since you may get some raw data, you have to get the useful information and generate a detailed message. + +The task may complete successfully or it can be failed for some reason. You just need to honestly express the situation. + +*** Important Notice *** +1. Please use the language used in the question when responding. +2. Your response MUST be based on the results provided to you. Do not attempt to solve the problem on your own or try to correct any errors. +3. Do not mention your source of information. Present the response as if it were your own. +4. Handle the conversions between different units carefully. \ No newline at end of file diff --git a/examples/video_understanding/agent/conclude/user_prompt.prompt b/examples/video_understanding/agent/conclude/user_prompt.prompt new file mode 100644 index 0000000..4d3f333 --- /dev/null +++ b/examples/video_understanding/agent/conclude/user_prompt.prompt @@ -0,0 +1,7 @@ +Now, it's your turn to complete the task. + +Task (The task you need to complete.): {{task}} +result (The result from former agents.): {{result}} +images: {{img_placeholders}} + +Now show your super capability as a super agent that beyond regular AIs or LLMs! \ No newline at end of file diff --git a/examples/video_understanding/agent/divider/__init__.py b/examples/video_understanding/agent/divider/__init__.py deleted file mode 100644 index e69de29..0000000 diff --git a/examples/video_understanding/agent/video_preprocessor/video_preprocess.py b/examples/video_understanding/agent/video_preprocessor/video_preprocess.py index 5fa4f12..931e64c 100755 --- a/examples/video_understanding/agent/video_preprocessor/video_preprocess.py +++ b/examples/video_understanding/agent/video_preprocessor/video_preprocess.py @@ -43,7 +43,7 @@ class VideoPreprocessor(BaseLLMBackend, BaseWorker): show_progress: bool = True use_cache: bool = False - cache_dir: str = "./running_logs/video_cache" + cache_dir: str = "./video_cache" @field_validator("stt", mode="before") @classmethod diff --git a/examples/video_understanding/agent/video_qa/qa.py b/examples/video_understanding/agent/video_qa/qa.py index 4ea6753..15d1dc1 100755 --- a/examples/video_understanding/agent/video_qa/qa.py +++ b/examples/video_understanding/agent/video_qa/qa.py @@ -69,6 +69,4 @@ def _run(self, video_md5: str, video_path: str, instance_id: str, *args, **kwarg "frame_rate": video.stream.frame_rate, "video_summary": "\n---\n".join(related_information), } - tree = TaskTree() - tree.add_node({"task": question}) - return {"agent_task": tree.model_dump(), 'last_output': None} + return {"query": question, 'last_output': None} diff --git a/examples/video_understanding/configs/llms/text_res_stream.yml b/examples/video_understanding/configs/llms/text_res_stream.yml new file mode 100755 index 0000000..e7dedc2 --- /dev/null +++ b/examples/video_understanding/configs/llms/text_res_stream.yml @@ -0,0 +1,7 @@ +name: OpenaiGPTLLM +model_id: gpt-4o-mini +api_key: ${env| custom_openai_key, openai_api_key} +endpoint: ${env| custom_openai_endpoint, https://api.openai.com/v1} +temperature: 0 +stream: true +response_format: text \ No newline at end of file diff --git a/examples/video_understanding/configs/workers/conclude.yml b/examples/video_understanding/configs/workers/conclude.yml index ce21864..a85218c 100755 --- a/examples/video_understanding/configs/workers/conclude.yml +++ b/examples/video_understanding/configs/workers/conclude.yml @@ -1,4 +1,4 @@ name: Conclude -llm: ${sub|text_res} +llm: ${sub|text_res_stream} output_parser: name: StrParser \ No newline at end of file diff --git a/examples/video_understanding/configs/workers/dnc_workflow.yml b/examples/video_understanding/configs/workers/dnc_workflow.yml new file mode 100755 index 0000000..9d248e4 --- /dev/null +++ b/examples/video_understanding/configs/workers/dnc_workflow.yml @@ -0,0 +1,18 @@ +- name: ConstructDncPayload +- name: StructureUpdate +- name: TaskConqueror + llm: ${sub|json_res} + tool_manager: ${sub|all_tools} + output_parser: + name: StrParser +- name: TaskDivider + llm: ${sub|json_res} + tool_manager: ${sub|all_tools} + output_parser: + name: StrParser +- name: TaskRescue + llm: ${sub|text_res} + tool_manager: ${sub|all_tools} + output_parser: + name: StrParser +- name: TaskExitMonitor diff --git a/examples/video_understanding/configs/workers/task_conqueror.yml b/examples/video_understanding/configs/workers/task_conqueror.yml deleted file mode 100755 index 905a441..0000000 --- a/examples/video_understanding/configs/workers/task_conqueror.yml +++ /dev/null @@ -1,5 +0,0 @@ -name: TaskConqueror -llm: ${sub|json_res} -tool_manager: ${sub|all_tools} -output_parser: - name: StrParser \ No newline at end of file diff --git a/examples/video_understanding/configs/workers/task_divider.yml b/examples/video_understanding/configs/workers/task_divider.yml deleted file mode 100755 index cb9cfdf..0000000 --- a/examples/video_understanding/configs/workers/task_divider.yml +++ /dev/null @@ -1,5 +0,0 @@ -name: TaskDivider -llm: ${sub|json_res} -tool_manager: ${sub|all_tools} -output_parser: - name: StrParser \ No newline at end of file diff --git a/examples/video_understanding/configs/workers/task_exit_monitor.yml b/examples/video_understanding/configs/workers/task_exit_monitor.yml deleted file mode 100755 index 844a437..0000000 --- a/examples/video_understanding/configs/workers/task_exit_monitor.yml +++ /dev/null @@ -1 +0,0 @@ -name: TaskExitMonitor \ No newline at end of file diff --git a/examples/video_understanding/configs/workers/task_rescue.yml b/examples/video_understanding/configs/workers/task_rescue.yml deleted file mode 100755 index e79dd29..0000000 --- a/examples/video_understanding/configs/workers/task_rescue.yml +++ /dev/null @@ -1,5 +0,0 @@ -name: TaskRescue -llm: ${sub|text_res} -tool_manager: ${sub|all_tools} -output_parser: - name: StrParser \ No newline at end of file diff --git a/examples/video_understanding/docs/images/video_understanding_workflow_diagram.png b/examples/video_understanding/docs/images/video_understanding_workflow_diagram.png index bc596f9..0a27d9b 100644 --- a/examples/video_understanding/docs/images/video_understanding_workflow_diagram.png +++ b/examples/video_understanding/docs/images/video_understanding_workflow_diagram.png @@ -1,3 +1,3 @@ version https://git-lfs.github.com/spec/v1 -oid sha256:c67ace7251c7ab98ee484eaa3311a106b4bc968338ebb0d3b3fbedb348b874af -size 70400 +oid sha256:34663f4f1aae493f3c70071e244161b0d76313d02e765ad504d92cc9c69a5485 +size 115559 diff --git a/examples/video_understanding/run_cli.py b/examples/video_understanding/run_cli.py index cefe11e..6bcb1b7 100644 --- a/examples/video_understanding/run_cli.py +++ b/examples/video_understanding/run_cli.py @@ -11,7 +11,11 @@ from omagent_core.engine.workflow.task.switch_task import SwitchTask from omagent_core.engine.workflow.task.do_while_task import DnCLoopTask, InfiniteLoopTask from omagent_core.utils.build import build_from_file +from agent.conclude.conclude import Conclude +from agent.video_preprocessor.video_preprocess import VideoPreprocessor +from agent.video_qa.qa import VideoQA from omagent_core.engine.automator.task_handler import TaskHandler +from omagent_core.advanced_components.workflow.dnc.workflow import DnCWorkflow logging.init_logger("omagent", "omagent", level="INFO") @@ -30,44 +34,20 @@ workflow = ConductorWorkflow(name='video_understanding') # 1. Video preprocess task for video preprocessing -video_preprocess_task = simple_task(task_def_name='VideoPreprocessor', task_reference_name='video_preprocess') +video_preprocess_task = simple_task(task_def_name=VideoPreprocessor, task_reference_name='video_preprocess') # 2. Video QA task for video QA -video_qa_task = simple_task(task_def_name='VideoQA', task_reference_name='video_qa', inputs={'video_md5': video_preprocess_task.output('video_md5'), 'video_path': video_preprocess_task.output('video_path'), 'instance_id': video_preprocess_task.output('instance_id')}) +video_qa_task = simple_task(task_def_name=VideoQA, task_reference_name='video_qa', inputs={'video_md5': video_preprocess_task.output('video_md5'), 'video_path': video_preprocess_task.output('video_path'), 'instance_id': video_preprocess_task.output('instance_id')}) -# Divide-and-conquer workflow -# 3. Initialize set variable task for global workflow variables -init_set_variable_task = SetVariableTask(task_ref_name='set_variable_task', input_parameters={'agent_task': video_qa_task.output('agent_task'), 'last_output': video_qa_task.output('last_output')}) - -# 4. Conqueror task for task generation and update global workflow variables -conqueror_task = simple_task(task_def_name='TaskConqueror', task_reference_name='task_conqueror', inputs={'agent_task': '${workflow.variables.agent_task}', 'last_output': '${workflow.variables.last_output}'}) -conqueror_update_set_variable_task = SetVariableTask(task_ref_name='conqueror_update_set_variable_task', input_parameters={'agent_task': '${task_conqueror.output.agent_task}', 'last_output': '${task_conqueror.output.last_output}'}) - -# 5. Divider task for task division and update global workflow variables -divider_task = simple_task(task_def_name='TaskDivider', task_reference_name='task_divider', inputs={'agent_task': conqueror_task.output('agent_task'), 'last_output': conqueror_task.output('last_output')}) -divider_update_set_variable_task = SetVariableTask(task_ref_name='divider_update_set_variable_task', input_parameters={'agent_task': '${task_divider.output.agent_task}', 'last_output': '${task_divider.output.last_output}'}) - -# 6. Rescue task for task rescue -rescue_task = simple_task(task_def_name='TaskRescue', task_reference_name='task_rescue', inputs={'agent_task': conqueror_task.output('agent_task'), 'last_output': conqueror_task.output('last_output')}) +dnc_workflow = DnCWorkflow() +dnc_workflow.set_input(query=video_qa_task.output('query')) # 7. Conclude task for task conclusion -conclude_task = simple_task(task_def_name='Conclude', task_reference_name='task_conclude', inputs={'agent_task': '${task_exit_monitor.output.agent_task}', 'last_output': '${task_exit_monitor.output.last_output}'}) - -# 8. Switch task for task routing -switch_task = SwitchTask(task_ref_name='switch_task', case_expression=conqueror_task.output('switch_case_value')) -switch_task.default_case([conqueror_update_set_variable_task]) -switch_task.switch_case("complex", [divider_task, divider_update_set_variable_task]) -switch_task.switch_case("failed", rescue_task) - -# 9. Task exit monitor task for task exit monitoring and update global workflow variables -task_exit_monitor_task = simple_task(task_def_name='TaskExitMonitor', task_reference_name='task_exit_monitor', inputs={'agent_task': '${workflow.variables.agent_task}', 'last_output': '${workflow.variables.last_output}'}) -post_set_variable_task = SetVariableTask(task_ref_name='post_set_variable_task', input_parameters={'agent_task': '${task_exit_monitor.output.agent_task}', 'last_output': '${task_exit_monitor.output.last_output}'}) +conclude_task = simple_task(task_def_name=Conclude, task_reference_name='task_conclude', inputs={'dnc_structure': dnc_workflow.dnc_structure, 'last_output': dnc_workflow.last_output}) -# 10. DnC loop task for task loop -dncloop_task = DnCLoopTask(task_ref_name='dncloop_task', tasks=[conqueror_task, switch_task], post_loop_exit=[task_exit_monitor_task, post_set_variable_task]) # Configure workflow execution flow: Input -> Initialize global variables -> DnC Loop -> Conclude -workflow >> video_preprocess_task >> video_qa_task >> init_set_variable_task >> dncloop_task >> conclude_task +workflow >> video_preprocess_task >> video_qa_task >> dnc_workflow >> conclude_task # Register workflow workflow.register(overwrite=True) diff --git a/omagent-core/src/omagent_core/clients/base.py b/omagent-core/src/omagent_core/clients/base.py index 8681332..bee2073 100644 --- a/omagent-core/src/omagent_core/clients/base.py +++ b/omagent-core/src/omagent_core/clients/base.py @@ -15,7 +15,7 @@ class CallbackBase(BotBase, ABC): bot_id: str start_time: str = datetime.datetime.now().strftime("%Y-%m-%d_%H:%M:%S") - folder_name: str = f"./running_logs/{start_time}" + # folder_name: str = f"./running_logs/{start_time}" class Config: """Configuration for this pydantic object.""" @@ -23,9 +23,9 @@ class Config: arbitrary_types_allowed = True extra = "allow" - @model_validator(mode="after") - def init_folder(self): - Path(self.folder_name).mkdir(parents=True, exist_ok=True) + # @model_validator(mode="after") + # def init_folder(self): + # Path(self.folder_name).mkdir(parents=True, exist_ok=True) @abstractmethod def send_block(self, **kwargs):