diff --git a/docs/images/dangerous_driving_detction.png b/docs/images/dangerous_driving_detction.png new file mode 100644 index 0000000..e11c5a9 --- /dev/null +++ b/docs/images/dangerous_driving_detction.png @@ -0,0 +1,3 @@ +version https://git-lfs.github.com/spec/v1 +oid sha256:5a9a02c92e03dcf636da1de6ece620f083f1ec0a3988136440b09f0b689e00f9 +size 43903 diff --git a/examples/dangerous_driving_detection/README.md b/examples/dangerous_driving_detection/README.md new file mode 100644 index 0000000..9523e56 --- /dev/null +++ b/examples/dangerous_driving_detection/README.md @@ -0,0 +1,107 @@ +# Dangerous Driving Detection Example + +This example demonstrates how to use the framework for dangerous driving detection task. The example code can be found in the `examples/dangerous_driving_detection` directory. + +```bash + cd examples/dangerous_driving_detection +``` + +## Overview + +This example implements a dangerous driving detection task workflow based on the loop workflow, which consists of following components: + +1. **Face Image Task** + - Provide input of driver's face expression image while driving + +2. **Hand Image Task** + - Provide input of driver's hand image while driving + +3. **Text Task** + - Provide input of driver's car speed while driving + +3. **Loop decider Task** + - Loop through the above three tasks to determine if dangerous driving is involved. + - If yes, analyze the reasons for dangerous driving and provide recommendations; + - If not, it means that the driver is not involved in dangerous driving. + +The system uses Redis for state management and Conductor for workflow orchestration. + +### This whole workflow is looked like the following diagram: + +![Video Understanding Workflow](../../docs/images/dangerous_driving_detction.png) + +## Prerequisites + +- Python 3.10+ +- Required packages installed (see requirements.txt) +- Access to OpenAI API or compatible endpoint (see configs/llms/*.yml) +- [Optional] Access to Bing API for WebSearch tool (see configs/tools/*.yml) +- Redis server running locally or remotely +- Conductor server running locally or remotely + +## Configuration + +The container.yaml file is a configuration file that manages dependencies and settings for different components of the system, including Conductor connections, Redis connections, and other service configurations. To set up your configuration: + +1. Generate the container.yaml file: + ```bash + python compile_container.py + ``` + This will create a container.yaml file with default settings under `examples/dangerous_driving_detection`. + + +2. Configure your LLM and tool settings in `configs/llms/*.yml` and `configs/tools/*.yml`: + - Set your OpenAI API key or compatible endpoint through environment variable or by directly modifying the yml file + ```bash + export custom_openai_key="your_openai_api_key" + export custom_openai_endpoint="your_openai_endpoint" + ``` + - [Optional] Set your Bing API key or compatible endpoint through environment variable or by directly modifying the yml file + ```bash + export bing_api_key="your_bing_api_key" + ``` + **Note: It isn't mandatory to set the Bing API key, as the WebSearch tool will rollback to use duckduckgo search. But it is recommended to set it for better search quality.** + - The default text encoder configuration uses OpenAI `text-embedding-3-large` with **3072** dimensions, make sure you change the dim value of `MilvusLTM` in `container.yaml` + - Configure other model settings like temperature as needed through environment variable or by directly modifying the yml file + +3. Update settings in the generated `container.yaml`: + - Modify Redis connection settings: + - Set the host, port and credentials for your Redis instance + - Configure both `redis_stream_client` and `redis_stm_client` sections + - Update the Conductor server URL under conductor_config section + - Configure MilvusLTM in `components` section: + - Set the `storage_name` and `dim` for MilvusLTM + - Set `dim` is to **3072** if you use default OpenAI encoder, make sure to modify corresponding dimension if you use other custom text encoder model or endpoint + - Adjust other settings as needed + - Configure hyper-parameters for video preprocess task in `examples/video_understanding/configs/workers/video_preprocessor.yml` + - `use_cache`: Whether to use cache for the video preprocess task + - `scene_detect_threshold`: The threshold for scene detection, which is used to determine if a scene change occurs in the video, min value means more scenes will be detected, default value is **27** + - `frame_extraction_interval`: The interval between frames to extract from the video, default value is **5** + - `kernel_size`: The size of the kernel for scene detection, should be **odd** number, default value is automatically calculated based on the resolution of the video. For hour-long videos, it is recommended to leave it blank, but for short videos, it is recommended to set a smaller value, like **3**, **5** to make it more sensitive to the scene change + - `stt.endpoint`: The endpoint for the speech-to-text service, default uses OpenAI ASR service + - `stt.api_key`: The API key for the speech-to-text service, default uses OpenAI API key + - Adjust any other component settings as needed + +## Running the Example + +3. Run the video understanding example, currently only supports CLI usage: + + ```bash + python run_cli.py + ``` + +## Troubleshooting + +If you encounter issues: +- Verify Redis is running and accessible +- Try smaller `scene_detect_threshold` and `frame_extraction_interval` if you find too many scenes are detected +- Check your OpenAI API key is valid +- Check your Bing API key is valid if search results are not as expected +- Check the `dim` value in `MilvusLTM` in `container.yaml` is set correctly, currently unmatched dimension setting will not raise error but lose partial of the information(we will add more checks in the future) +- Ensure all dependencies are installed correctly +- Review logs for any error messages +- **Open an issue on GitHub if you can't find a solution, we will do our best to help you out!** + + + + diff --git a/examples/dangerous_driving_detection/agent/face_image_input/__init__.py b/examples/dangerous_driving_detection/agent/face_image_input/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/examples/dangerous_driving_detection/agent/face_image_input/image_input.py b/examples/dangerous_driving_detection/agent/face_image_input/image_input.py new file mode 100644 index 0000000..ae8cb53 --- /dev/null +++ b/examples/dangerous_driving_detection/agent/face_image_input/image_input.py @@ -0,0 +1,30 @@ +from pathlib import Path + +from omagent_core.utils.registry import registry +from omagent_core.engine.worker.base import BaseWorker +from omagent_core.utils.general import read_image + +CURRENT_PATH = root_path = Path(__file__).parents[0] + + +@registry.register_worker() +class FaceImageInput(BaseWorker): + + def _run(self, *args, **kwargs): + + user_input = self.input.read_input(workflow_instance_id=self.workflow_instance_id, input_prompt="Please enter the face image while driving") + + content = user_input['messages'][-1]['content'] + for content_item in content: + if content_item['type'] == 'image_url': + image_path = content_item['data'] + + try: + img = read_image(input_source=image_path) + image_cache = {'' : img} + self.stm(self.workflow_instance_id)['face_image_cache'] = image_cache + except Exception as e: + pass + + return + diff --git a/examples/dangerous_driving_detection/agent/hand_image_input/__init__.py b/examples/dangerous_driving_detection/agent/hand_image_input/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/examples/dangerous_driving_detection/agent/hand_image_input/image_input.py b/examples/dangerous_driving_detection/agent/hand_image_input/image_input.py new file mode 100644 index 0000000..90b23a6 --- /dev/null +++ b/examples/dangerous_driving_detection/agent/hand_image_input/image_input.py @@ -0,0 +1,41 @@ +from pathlib import Path + +from omagent_core.utils.registry import registry +from omagent_core.engine.worker.base import BaseWorker +from omagent_core.utils.general import read_image + +CURRENT_PATH = root_path = Path(__file__).parents[0] + + +@registry.register_worker() +class HandImageInput(BaseWorker): + """Hand image input processor. + + It accepts either an image URL or local file path as input, reads the image, + and caches it in the workflow's short-term memory for use by downstream processors. + + The processor gracefully handles cases where users choose not to provide an image or if there + are issues reading the provided image. + + Attributes: + None - This worker uses only the base worker functionality + """ + + def _run(self, *args, **kwargs): + + user_input = self.input.read_input(workflow_instance_id=self.workflow_instance_id, input_prompt="Please enter the hand image while driving") + + content = user_input['messages'][-1]['content'] + for content_item in content: + if content_item['type'] == 'image_url': + image_path = content_item['data'] + + try: + img = read_image(input_source=image_path) + image_cache = {'' : img} + self.stm(self.workflow_instance_id)['hand_image_cache'] = image_cache + except Exception as e: + pass + + return + diff --git a/examples/dangerous_driving_detection/agent/loop_decider/__init__.py b/examples/dangerous_driving_detection/agent/loop_decider/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/examples/dangerous_driving_detection/agent/loop_decider/loop_decider.py b/examples/dangerous_driving_detection/agent/loop_decider/loop_decider.py new file mode 100644 index 0000000..c331ede --- /dev/null +++ b/examples/dangerous_driving_detection/agent/loop_decider/loop_decider.py @@ -0,0 +1,80 @@ +import json_repair +import re +from pathlib import Path +from typing import List +from pydantic import Field + +from omagent_core.models.llms.base import BaseLLMBackend +from omagent_core.utils.registry import registry +from omagent_core.models.llms.prompt.prompt import PromptTemplate +from omagent_core.engine.worker.base import BaseWorker +from omagent_core.models.llms.prompt.parser import StrParser +from omagent_core.models.llms.openai_gpt import OpenaiGPTLLM +from omagent_core.utils.logger import logging + + +CURRENT_PATH = root_path = Path(__file__).parents[0] + + +@registry.register_worker() +class LoopDecider(BaseLLMBackend, BaseWorker): + """Loop decider worker that determines if enough information is available to make a recommendation.""" + llm: OpenaiGPTLLM + prompts: List[PromptTemplate] = Field( + default=[ + PromptTemplate.from_file( + CURRENT_PATH.joinpath("sys_prompt.prompt"), role="system" + ), + PromptTemplate.from_file( + CURRENT_PATH.joinpath("user_prompt.prompt"), role="user" + ), + ] + ) + + def _run(self, *args, **kwargs): + + # Retrieve context data from short-term memory, using empty lists as defaults + if self.stm(self.workflow_instance_id).get("hand_image_cache"): + hand_image = self.stm(self.workflow_instance_id).get("hand_image_cache") + else: + hand_image = [] + + if self.stm(self.workflow_instance_id).get("face_image_cache"): + face_image = self.stm(self.workflow_instance_id).get("face_image_cache") + else: + face_image = [] + + if self.stm(self.workflow_instance_id).get("user_instruction"): + user_instruction = self.stm(self.workflow_instance_id).get("user_instruction") + else: + user_instruction = [] + + # Query LLM to analyze available information + print(self.llm) + chat_complete_res = self.simple_infer( + hand_image=hand_image.get(""), + face_image=face_image.get(""), + speed=str(user_instruction) + ) + content = chat_complete_res["choices"][0]["message"].get("content") + content = self._extract_from_result(content) + logging.info(content) + + # Return decision based on LLM output + if content.get("decision") == "no": + self.callback.send_answer(agent_id=self.workflow_instance_id, msg=content.get("reason")) + return {"decision": True} + else: + self.callback.send_answer(agent_id=self.workflow_instance_id, msg=content.get("reason")) + return {"decision": False} + + def _extract_from_result(self, result: str) -> dict: + try: + pattern = r"```json\s+(.*?)\s+```" + match = re.search(pattern, result, re.DOTALL) + if match: + return json_repair.loads(match.group(1)) + else: + return json_repair.loads(result) + except Exception as error: + raise ValueError("LLM generation is not valid.") \ No newline at end of file diff --git a/examples/dangerous_driving_detection/agent/loop_decider/sys_prompt.prompt b/examples/dangerous_driving_detection/agent/loop_decider/sys_prompt.prompt new file mode 100644 index 0000000..4c26bc3 --- /dev/null +++ b/examples/dangerous_driving_detection/agent/loop_decider/sys_prompt.prompt @@ -0,0 +1,23 @@ +You are a helpful consultant assistant capable of gathering information to help users analyze data such as hand images, face images, and speed while driving in order to determine if a driver is involved in dangerous driving. + +You will receive: + +- User's hand image while driving +- User's face image while driving +- Speed of the vehicle + +Your task is to analyze all the provided information and decide if there is enough detail to determine whether the driver is engaged in dangerous driving. + +You should reply in the following format: +{ + "decision": "yes or no", + "reason": "If yes, specify which particular reasons indicate dangerous driving and provide suggestions. If no, indicate that the driver has no dangerous driving behaviors." +} + +In your assessment, consider: + +1. The highest speed limit is 120 km/h. +2. Both hands should be on the steering wheel as much as possible; single-hand steering should only occur when shifting gears with the right hand. +3. The face should be attentive, not distracted or with eyes tightly closed, while driving. + +Your response must be in valid JSON format. Specifically explain if there is missing information or why the collected information is sufficient. \ No newline at end of file diff --git a/examples/dangerous_driving_detection/agent/loop_decider/user_prompt.prompt b/examples/dangerous_driving_detection/agent/loop_decider/user_prompt.prompt new file mode 100644 index 0000000..2491011 --- /dev/null +++ b/examples/dangerous_driving_detection/agent/loop_decider/user_prompt.prompt @@ -0,0 +1,6 @@ +Now, it's your turn to complete the task. + +Input Information: +- User's hand image while driving: {{hand_image}} +- User's face image while driving: {{face_image}} +- Speed of the vehicle: {{speed}} \ No newline at end of file diff --git a/examples/dangerous_driving_detection/agent/text_input/__init__.py b/examples/dangerous_driving_detection/agent/text_input/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/examples/dangerous_driving_detection/agent/text_input/input_interface.py b/examples/dangerous_driving_detection/agent/text_input/input_interface.py new file mode 100644 index 0000000..af08656 --- /dev/null +++ b/examples/dangerous_driving_detection/agent/text_input/input_interface.py @@ -0,0 +1,39 @@ +from pathlib import Path + +from omagent_core.utils.registry import registry +from omagent_core.utils.general import read_image +from omagent_core.engine.worker.base import BaseWorker +from omagent_core.utils.logger import logging + +CURRENT_PATH = Path(__file__).parents[0] + + +@registry.register_worker() +class InputInterface(BaseWorker): + """Input interface processor that handles user instructions and image input. + + This processor: + 1. Reads user input containing question and image via input interface + 2. Extracts text instruction and image path from the input + 3. Loads and caches the image in workflow storage + 4. Returns the user instruction for next steps + """ + + def _run(self, *args, **kwargs): + # Read user input through configured input interface + user_input = self.input.read_input(workflow_instance_id=self.workflow_instance_id, input_prompt='Please enter the car speed while driving') + + image_path = None + # Extract text and image content from input message + content = user_input['messages'][-1]['content'] + for content_item in content: + if content_item['type'] == 'text': + user_instruction = content_item['data'] + + # return {'user_instruction': user_instruction} + try: + # img = read_image(input_source=image_path) + # image_cache = {'' : img} + self.stm(self.workflow_instance_id)['user_instruction'] = user_instruction + except Exception as e: + pass diff --git a/examples/dangerous_driving_detection/compile_container.py b/examples/dangerous_driving_detection/compile_container.py new file mode 100644 index 0000000..07e66c0 --- /dev/null +++ b/examples/dangerous_driving_detection/compile_container.py @@ -0,0 +1,18 @@ +# Import core modules and components +from omagent_core.utils.container import container +from omagent_core.utils.registry import registry +from pathlib import Path + +# Import all registered modules +registry.import_module() + +CURRENT_PATH = Path(__file__).parents[0] + + +# Register required components +container.register_stm("RedisSTM") +container.register_callback(callback='AppCallback') +container.register_input(input='AppInput') + +# Compile container config +container.compile_config(CURRENT_PATH) diff --git a/examples/dangerous_driving_detection/configs/llms/gpt.yml b/examples/dangerous_driving_detection/configs/llms/gpt.yml new file mode 100644 index 0000000..06efefc --- /dev/null +++ b/examples/dangerous_driving_detection/configs/llms/gpt.yml @@ -0,0 +1,6 @@ +name: OpenaiGPTLLM +model_id: gpt-4o +api_key: ${env| custom_openai_key, openai_api_key} +endpoint: ${env| custom_openai_endpoint, https://api.openai.com/v1} +temperature: 0 +vision: true \ No newline at end of file diff --git a/examples/dangerous_driving_detection/configs/llms/text_res.yml b/examples/dangerous_driving_detection/configs/llms/text_res.yml new file mode 100644 index 0000000..50aef99 --- /dev/null +++ b/examples/dangerous_driving_detection/configs/llms/text_res.yml @@ -0,0 +1,6 @@ +name: OpenaiGPTLLM +model_id: gpt-4o +api_key: ${env| custom_openai_key, openai_api_key} +endpoint: ${env| custom_openai_endpoint, https://api.openai.com/v1} +temperature: 0 +vision: false \ No newline at end of file diff --git a/examples/dangerous_driving_detection/configs/tools/websearch.yml b/examples/dangerous_driving_detection/configs/tools/websearch.yml new file mode 100644 index 0000000..5544864 --- /dev/null +++ b/examples/dangerous_driving_detection/configs/tools/websearch.yml @@ -0,0 +1,5 @@ +llm: ${sub| text_res} +tools: + - name: WebSearch + bing_api_key: ${env| bing_api_key, null} + llm: ${sub|text_res} \ No newline at end of file diff --git a/examples/dangerous_driving_detection/configs/workers/loop_decider.yml b/examples/dangerous_driving_detection/configs/workers/loop_decider.yml new file mode 100644 index 0000000..7833f8e --- /dev/null +++ b/examples/dangerous_driving_detection/configs/workers/loop_decider.yml @@ -0,0 +1,2 @@ +name: LoopDecider +llm: ${sub| gpt} \ No newline at end of file diff --git a/examples/dangerous_driving_detection/run_cli.py b/examples/dangerous_driving_detection/run_cli.py new file mode 100644 index 0000000..c88ce7a --- /dev/null +++ b/examples/dangerous_driving_detection/run_cli.py @@ -0,0 +1,67 @@ +# Import core modules for workflow management and configuration +from omagent_core.utils.container import container +from omagent_core.engine.workflow.conductor_workflow import ConductorWorkflow +from omagent_core.engine.workflow.task.simple_task import simple_task +from omagent_core.utils.logger import logging +logging.init_logger("omagent", "omagent", level="INFO") + +# Import registry and CLI client modules +from omagent_core.utils.registry import registry +from omagent_core.clients.devices.cli.client import DefaultClient + +from pathlib import Path +CURRENT_PATH = Path(__file__).parents[0] + +# Import and register worker modules from agent directory +registry.import_module(project_path=CURRENT_PATH.joinpath('agent')) + +# Add parent directory to Python path +import sys +import os +sys.path.append(os.path.abspath(CURRENT_PATH.joinpath('../../'))) + +# Import custom outfit image input worker +from agent.hand_image_input.image_input import HandImageInput +from agent.face_image_input.image_input import FaceImageInput +from agent.text_input.input_interface import InputInterface + +# Import loop task type for iterative Q&A +from omagent_core.engine.workflow.task.do_while_task import DoWhileTask + + +# Configure Redis storage and load container settings +container.register_stm("RedisSTM") +container.from_config(CURRENT_PATH.joinpath('container.yaml')) + + +# Initialize outfit recommendation workflow +workflow = ConductorWorkflow(name='dangerous_driving_detection') + +# Define workflow tasks: +# 1. Get initial hand image from user +task1 = simple_task(task_def_name='HandImageInput', task_reference_name='hand_image_input') + +# 2. Get initial face image from user +task2 = simple_task(task_def_name='FaceImageInput', task_reference_name='face_image_input') + +# 3. Get initial text input from user +task3 = simple_task(task_def_name='InputInterface', task_reference_name='text_input') + +# 4. Check if user is dangerous driver +task4 = simple_task(task_def_name='LoopDecider', task_reference_name='loop_decider') + + +# Loop terminates when loop_decider returns decision=true +car_qa_loop = DoWhileTask(task_ref_name='car_loop', tasks=[task1, task2, task3, task4], + termination_condition='if ($.loop_decider["decision"] == true){false;} else {true;} ') + +# Define workflow sequence: workflow -> car_qa_loop +workflow >> car_qa_loop + +# Register workflow with conductor server +workflow.register(True) + +# Initialize and start CLI client with workflow and image input worker +config_path = CURRENT_PATH.joinpath('configs') +cli_client = DefaultClient(interactor=workflow, config_path=config_path, workers=[HandImageInput(), FaceImageInput(), InputInterface()]) +cli_client.start_interactor()