Skip to content

Commit

Permalink
Merge pull request #136 from XeonHis/develop/v0.2.1
Browse files Browse the repository at this point in the history
Develop/v0.2.1: Update video understanding example with DnC operator
  • Loading branch information
panregedit authored Dec 20, 2024
2 parents 4fb2d5f + a9d55c6 commit 4166ce8
Show file tree
Hide file tree
Showing 19 changed files with 155 additions and 68 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,7 @@ tests
data/
tests/mathvista
running_logs/
video_cache/
*.db

# vscode
Expand Down
26 changes: 15 additions & 11 deletions examples/general_dnc/agent/conclude/conclude.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
from pydantic import Field
from omagent_core.advanced_components.workflow.dnc.schemas.dnc_structure import TaskTree
from omagent_core.utils.logger import logging
from openai import Stream


CURRENT_PATH = root_path = Path(__file__).parents[0]
Expand Down Expand Up @@ -56,16 +57,19 @@ def _run(self, dnc_structure: dict, last_output: str, *args, **kwargs):
result=str(last_output),
img_placeholders="".join(list(self.stm(self.workflow_instance_id).get('image_cache', {}).keys())),
)
last_output = 'Answer: '
self.callback.send_incomplete(agent_id=self.workflow_instance_id, msg='Answer: ')
for chunk in chat_complete_res:
if chunk.choices[0].delta.content is not None:
self.callback.send_incomplete(agent_id=self.workflow_instance_id, msg=f'{chunk.choices[0].delta.content}')
last_output += chunk.choices[0].delta.content
else:
self.callback.send_block(agent_id=self.workflow_instance_id, msg='')
last_output += ''
break
# self.callback.send_answer(agent_id=self.workflow_instance_id, msg=f'Answer: {chat_complete_res["choices"][0]["message"]["content"]}')
if isinstance(chat_complete_res, Stream):
last_output = 'Answer: '
self.callback.send_incomplete(agent_id=self.workflow_instance_id, msg='Answer: ')
for chunk in chat_complete_res:
if chunk.choices[0].delta.content is not None:
self.callback.send_incomplete(agent_id=self.workflow_instance_id, msg=f'{chunk.choices[0].delta.content}')
last_output += chunk.choices[0].delta.content
else:
self.callback.send_block(agent_id=self.workflow_instance_id, msg='')
last_output += ''
break
else:
last_output = chat_complete_res["choices"][0]["message"]["content"]
self.callback.send_answer(agent_id=self.workflow_instance_id, msg=f'Answer: {chat_complete_res["choices"][0]["message"]["content"]}')
self.stm(self.workflow_instance_id).clear()
return {'last_output': last_output}
File renamed without changes.
75 changes: 75 additions & 0 deletions examples/video_understanding/agent/conclude/conclude.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
from pathlib import Path
from typing import List

from omagent_core.models.llms.base import BaseLLMBackend
from omagent_core.engine.worker.base import BaseWorker
from omagent_core.models.llms.prompt import PromptTemplate
from omagent_core.memories.ltms.ltm import LTM
from omagent_core.utils.registry import registry
from pydantic import Field
from omagent_core.advanced_components.workflow.dnc.schemas.dnc_structure import TaskTree
from omagent_core.utils.logger import logging
from openai import Stream


CURRENT_PATH = root_path = Path(__file__).parents[0]


@registry.register_worker()
class Conclude(BaseLLMBackend, BaseWorker):
prompts: List[PromptTemplate] = Field(
default=[
PromptTemplate.from_file(
CURRENT_PATH.joinpath("sys_prompt.prompt"), role="system"
),
PromptTemplate.from_file(
CURRENT_PATH.joinpath("user_prompt.prompt"), role="user"
),
]
)

def _run(self, dnc_structure: dict, last_output: str, *args, **kwargs):
"""A conclude node that summarizes and completes the root task.
This component acts as the final node that:
- Takes the root task and its execution results
- Generates a final conclusion/summary of the entire task execution
- Formats and presents the final output in a clear way
- Cleans up any temporary state/memory used during execution
The conclude node is responsible for providing a coherent final response that
addresses the original root task objective based on all the work done by
previous nodes.
Args:
agent_task (dict): The task tree containing the root task and results
last_output (str): The final output from previous task execution
*args: Additional arguments
**kwargs: Additional keyword arguments
Returns:
dict: Final response containing the conclusion/summary
"""
task = TaskTree(**dnc_structure)
self.callback.info(agent_id=self.workflow_instance_id, progress=f'Conclude', message=f'{task.get_current_node().task}')
chat_complete_res = self.simple_infer(
task=task.get_root().task,
result=str(last_output),
img_placeholders="".join(list(self.stm(self.workflow_instance_id).get('image_cache', {}).keys())),
)
if isinstance(chat_complete_res, Stream):
last_output = 'Answer: '
self.callback.send_incomplete(agent_id=self.workflow_instance_id, msg='Answer: ')
for chunk in chat_complete_res:
if chunk.choices[0].delta.content is not None:
self.callback.send_incomplete(agent_id=self.workflow_instance_id, msg=f'{chunk.choices[0].delta.content}')
last_output += chunk.choices[0].delta.content
else:
self.callback.send_block(agent_id=self.workflow_instance_id, msg='')
last_output += ''
break
else:
last_output = chat_complete_res["choices"][0]["message"]["content"]
self.callback.send_answer(agent_id=self.workflow_instance_id, msg=f'Answer: {chat_complete_res["choices"][0]["message"]["content"]}')
self.stm(self.workflow_instance_id).clear()
return {'last_output': last_output}
13 changes: 13 additions & 0 deletions examples/video_understanding/agent/conclude/sys_prompt.prompt
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
As the final stage of our task processing workflow, your role is to inform the user about the final execution result of the task.
Your task includes two parts:
1. Verify the result, ensure it is a valid result of the user's question or task.
2. Image may be visual prompted by adding bound boxes and labels to the image, this is the important information.
3. Generate the output message since you may get some raw data, you have to get the useful information and generate a detailed message.

The task may complete successfully or it can be failed for some reason. You just need to honestly express the situation.

*** Important Notice ***
1. Please use the language used in the question when responding.
2. Your response MUST be based on the results provided to you. Do not attempt to solve the problem on your own or try to correct any errors.
3. Do not mention your source of information. Present the response as if it were your own.
4. Handle the conversions between different units carefully.
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
Now, it's your turn to complete the task.

Task (The task you need to complete.): {{task}}
result (The result from former agents.): {{result}}
images: {{img_placeholders}}

Now show your super capability as a super agent that beyond regular AIs or LLMs!
Empty file.
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ class VideoPreprocessor(BaseLLMBackend, BaseWorker):
show_progress: bool = True

use_cache: bool = False
cache_dir: str = "./running_logs/video_cache"
cache_dir: str = "./video_cache"

@field_validator("stt", mode="before")
@classmethod
Expand Down
4 changes: 1 addition & 3 deletions examples/video_understanding/agent/video_qa/qa.py
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,4 @@ def _run(self, video_md5: str, video_path: str, instance_id: str, *args, **kwarg
"frame_rate": video.stream.frame_rate,
"video_summary": "\n---\n".join(related_information),
}
tree = TaskTree()
tree.add_node({"task": question})
return {"agent_task": tree.model_dump(), 'last_output': None}
return {"query": question, 'last_output': None}
7 changes: 7 additions & 0 deletions examples/video_understanding/configs/llms/text_res_stream.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
name: OpenaiGPTLLM
model_id: gpt-4o-mini
api_key: ${env| custom_openai_key, openai_api_key}
endpoint: ${env| custom_openai_endpoint, https://api.openai.com/v1}
temperature: 0
stream: true
response_format: text
2 changes: 1 addition & 1 deletion examples/video_understanding/configs/workers/conclude.yml
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
name: Conclude
llm: ${sub|text_res}
llm: ${sub|text_res_stream}
output_parser:
name: StrParser
18 changes: 18 additions & 0 deletions examples/video_understanding/configs/workers/dnc_workflow.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
- name: ConstructDncPayload
- name: StructureUpdate
- name: TaskConqueror
llm: ${sub|json_res}
tool_manager: ${sub|all_tools}
output_parser:
name: StrParser
- name: TaskDivider
llm: ${sub|json_res}
tool_manager: ${sub|all_tools}
output_parser:
name: StrParser
- name: TaskRescue
llm: ${sub|text_res}
tool_manager: ${sub|all_tools}
output_parser:
name: StrParser
- name: TaskExitMonitor

This file was deleted.

5 changes: 0 additions & 5 deletions examples/video_understanding/configs/workers/task_divider.yml

This file was deleted.

This file was deleted.

5 changes: 0 additions & 5 deletions examples/video_understanding/configs/workers/task_rescue.yml

This file was deleted.

Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
40 changes: 10 additions & 30 deletions examples/video_understanding/run_cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,11 @@
from omagent_core.engine.workflow.task.switch_task import SwitchTask
from omagent_core.engine.workflow.task.do_while_task import DnCLoopTask, InfiniteLoopTask
from omagent_core.utils.build import build_from_file
from agent.conclude.conclude import Conclude
from agent.video_preprocessor.video_preprocess import VideoPreprocessor
from agent.video_qa.qa import VideoQA
from omagent_core.engine.automator.task_handler import TaskHandler
from omagent_core.advanced_components.workflow.dnc.workflow import DnCWorkflow

logging.init_logger("omagent", "omagent", level="INFO")

Expand All @@ -30,44 +34,20 @@
workflow = ConductorWorkflow(name='video_understanding')

# 1. Video preprocess task for video preprocessing
video_preprocess_task = simple_task(task_def_name='VideoPreprocessor', task_reference_name='video_preprocess')
video_preprocess_task = simple_task(task_def_name=VideoPreprocessor, task_reference_name='video_preprocess')

# 2. Video QA task for video QA
video_qa_task = simple_task(task_def_name='VideoQA', task_reference_name='video_qa', inputs={'video_md5': video_preprocess_task.output('video_md5'), 'video_path': video_preprocess_task.output('video_path'), 'instance_id': video_preprocess_task.output('instance_id')})
video_qa_task = simple_task(task_def_name=VideoQA, task_reference_name='video_qa', inputs={'video_md5': video_preprocess_task.output('video_md5'), 'video_path': video_preprocess_task.output('video_path'), 'instance_id': video_preprocess_task.output('instance_id')})

# Divide-and-conquer workflow
# 3. Initialize set variable task for global workflow variables
init_set_variable_task = SetVariableTask(task_ref_name='set_variable_task', input_parameters={'agent_task': video_qa_task.output('agent_task'), 'last_output': video_qa_task.output('last_output')})

# 4. Conqueror task for task generation and update global workflow variables
conqueror_task = simple_task(task_def_name='TaskConqueror', task_reference_name='task_conqueror', inputs={'agent_task': '${workflow.variables.agent_task}', 'last_output': '${workflow.variables.last_output}'})
conqueror_update_set_variable_task = SetVariableTask(task_ref_name='conqueror_update_set_variable_task', input_parameters={'agent_task': '${task_conqueror.output.agent_task}', 'last_output': '${task_conqueror.output.last_output}'})

# 5. Divider task for task division and update global workflow variables
divider_task = simple_task(task_def_name='TaskDivider', task_reference_name='task_divider', inputs={'agent_task': conqueror_task.output('agent_task'), 'last_output': conqueror_task.output('last_output')})
divider_update_set_variable_task = SetVariableTask(task_ref_name='divider_update_set_variable_task', input_parameters={'agent_task': '${task_divider.output.agent_task}', 'last_output': '${task_divider.output.last_output}'})

# 6. Rescue task for task rescue
rescue_task = simple_task(task_def_name='TaskRescue', task_reference_name='task_rescue', inputs={'agent_task': conqueror_task.output('agent_task'), 'last_output': conqueror_task.output('last_output')})
dnc_workflow = DnCWorkflow()
dnc_workflow.set_input(query=video_qa_task.output('query'))

# 7. Conclude task for task conclusion
conclude_task = simple_task(task_def_name='Conclude', task_reference_name='task_conclude', inputs={'agent_task': '${task_exit_monitor.output.agent_task}', 'last_output': '${task_exit_monitor.output.last_output}'})

# 8. Switch task for task routing
switch_task = SwitchTask(task_ref_name='switch_task', case_expression=conqueror_task.output('switch_case_value'))
switch_task.default_case([conqueror_update_set_variable_task])
switch_task.switch_case("complex", [divider_task, divider_update_set_variable_task])
switch_task.switch_case("failed", rescue_task)

# 9. Task exit monitor task for task exit monitoring and update global workflow variables
task_exit_monitor_task = simple_task(task_def_name='TaskExitMonitor', task_reference_name='task_exit_monitor', inputs={'agent_task': '${workflow.variables.agent_task}', 'last_output': '${workflow.variables.last_output}'})
post_set_variable_task = SetVariableTask(task_ref_name='post_set_variable_task', input_parameters={'agent_task': '${task_exit_monitor.output.agent_task}', 'last_output': '${task_exit_monitor.output.last_output}'})
conclude_task = simple_task(task_def_name=Conclude, task_reference_name='task_conclude', inputs={'dnc_structure': dnc_workflow.dnc_structure, 'last_output': dnc_workflow.last_output})

# 10. DnC loop task for task loop
dncloop_task = DnCLoopTask(task_ref_name='dncloop_task', tasks=[conqueror_task, switch_task], post_loop_exit=[task_exit_monitor_task, post_set_variable_task])

# Configure workflow execution flow: Input -> Initialize global variables -> DnC Loop -> Conclude
workflow >> video_preprocess_task >> video_qa_task >> init_set_variable_task >> dncloop_task >> conclude_task
workflow >> video_preprocess_task >> video_qa_task >> dnc_workflow >> conclude_task

# Register workflow
workflow.register(overwrite=True)
Expand Down
8 changes: 4 additions & 4 deletions omagent-core/src/omagent_core/clients/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,17 +15,17 @@
class CallbackBase(BotBase, ABC):
bot_id: str
start_time: str = datetime.datetime.now().strftime("%Y-%m-%d_%H:%M:%S")
folder_name: str = f"./running_logs/{start_time}"
# folder_name: str = f"./running_logs/{start_time}"

class Config:
"""Configuration for this pydantic object."""

arbitrary_types_allowed = True
extra = "allow"

@model_validator(mode="after")
def init_folder(self):
Path(self.folder_name).mkdir(parents=True, exist_ok=True)
# @model_validator(mode="after")
# def init_folder(self):
# Path(self.folder_name).mkdir(parents=True, exist_ok=True)

@abstractmethod
def send_block(self, **kwargs):
Expand Down

0 comments on commit 4166ce8

Please sign in to comment.