Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add angerous driving detection example #111

Open
wants to merge 4 commits into
base: feature/v0.2.1/examples
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions docs/images/dangerous_driving_detction.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
107 changes: 107 additions & 0 deletions examples/dangerous_driving_detection/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,107 @@
# Dangerous Driving Detection Example

This example demonstrates how to use the framework for dangerous driving detection task. The example code can be found in the `examples/dangerous_driving_detection` directory.

```bash
cd examples/dangerous_driving_detection
```

## Overview

This example implements a dangerous driving detection task workflow based on the loop workflow, which consists of following components:

1. **Face Image Task**
- Provide input of driver's face expression image while driving

2. **Hand Image Task**
- Provide input of driver's hand image while driving

3. **Text Task**
- Provide input of driver's car speed while driving

3. **Loop decider Task**
- Loop through the above three tasks to determine if dangerous driving is involved.
- If yes, analyze the reasons for dangerous driving and provide recommendations;
- If not, it means that the driver is not involved in dangerous driving.

The system uses Redis for state management and Conductor for workflow orchestration.

### This whole workflow is looked like the following diagram:

![Video Understanding Workflow](../../docs/images/dangerous_driving_detction.png)

## Prerequisites

- Python 3.10+
- Required packages installed (see requirements.txt)
- Access to OpenAI API or compatible endpoint (see configs/llms/*.yml)
- [Optional] Access to Bing API for WebSearch tool (see configs/tools/*.yml)
- Redis server running locally or remotely
- Conductor server running locally or remotely

## Configuration

The container.yaml file is a configuration file that manages dependencies and settings for different components of the system, including Conductor connections, Redis connections, and other service configurations. To set up your configuration:

1. Generate the container.yaml file:
```bash
python compile_container.py
```
This will create a container.yaml file with default settings under `examples/dangerous_driving_detection`.


2. Configure your LLM and tool settings in `configs/llms/*.yml` and `configs/tools/*.yml`:
- Set your OpenAI API key or compatible endpoint through environment variable or by directly modifying the yml file
```bash
export custom_openai_key="your_openai_api_key"
export custom_openai_endpoint="your_openai_endpoint"
```
- [Optional] Set your Bing API key or compatible endpoint through environment variable or by directly modifying the yml file
```bash
export bing_api_key="your_bing_api_key"
```
**Note: It isn't mandatory to set the Bing API key, as the WebSearch tool will rollback to use duckduckgo search. But it is recommended to set it for better search quality.**
- The default text encoder configuration uses OpenAI `text-embedding-3-large` with **3072** dimensions, make sure you change the dim value of `MilvusLTM` in `container.yaml`
- Configure other model settings like temperature as needed through environment variable or by directly modifying the yml file

3. Update settings in the generated `container.yaml`:
- Modify Redis connection settings:
- Set the host, port and credentials for your Redis instance
- Configure both `redis_stream_client` and `redis_stm_client` sections
- Update the Conductor server URL under conductor_config section
- Configure MilvusLTM in `components` section:
- Set the `storage_name` and `dim` for MilvusLTM
- Set `dim` is to **3072** if you use default OpenAI encoder, make sure to modify corresponding dimension if you use other custom text encoder model or endpoint
- Adjust other settings as needed
- Configure hyper-parameters for video preprocess task in `examples/video_understanding/configs/workers/video_preprocessor.yml`
- `use_cache`: Whether to use cache for the video preprocess task
- `scene_detect_threshold`: The threshold for scene detection, which is used to determine if a scene change occurs in the video, min value means more scenes will be detected, default value is **27**
- `frame_extraction_interval`: The interval between frames to extract from the video, default value is **5**
- `kernel_size`: The size of the kernel for scene detection, should be **odd** number, default value is automatically calculated based on the resolution of the video. For hour-long videos, it is recommended to leave it blank, but for short videos, it is recommended to set a smaller value, like **3**, **5** to make it more sensitive to the scene change
- `stt.endpoint`: The endpoint for the speech-to-text service, default uses OpenAI ASR service
- `stt.api_key`: The API key for the speech-to-text service, default uses OpenAI API key
- Adjust any other component settings as needed

## Running the Example

3. Run the video understanding example, currently only supports CLI usage:

```bash
python run_cli.py
```

## Troubleshooting

If you encounter issues:
- Verify Redis is running and accessible
- Try smaller `scene_detect_threshold` and `frame_extraction_interval` if you find too many scenes are detected
- Check your OpenAI API key is valid
- Check your Bing API key is valid if search results are not as expected
- Check the `dim` value in `MilvusLTM` in `container.yaml` is set correctly, currently unmatched dimension setting will not raise error but lose partial of the information(we will add more checks in the future)
- Ensure all dependencies are installed correctly
- Review logs for any error messages
- **Open an issue on GitHub if you can't find a solution, we will do our best to help you out!**




Empty file.
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
from pathlib import Path

from omagent_core.utils.registry import registry
from omagent_core.engine.worker.base import BaseWorker
from omagent_core.utils.general import read_image

CURRENT_PATH = root_path = Path(__file__).parents[0]


@registry.register_worker()
class FaceImageInput(BaseWorker):

def _run(self, *args, **kwargs):

user_input = self.input.read_input(workflow_instance_id=self.workflow_instance_id, input_prompt="Please enter the face image while driving")

content = user_input['messages'][-1]['content']
for content_item in content:
if content_item['type'] == 'image_url':
image_path = content_item['data']

try:
img = read_image(input_source=image_path)
image_cache = {'<image_0>' : img}
self.stm(self.workflow_instance_id)['face_image_cache'] = image_cache
except Exception as e:
pass

return

Empty file.
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
from pathlib import Path

from omagent_core.utils.registry import registry
from omagent_core.engine.worker.base import BaseWorker
from omagent_core.utils.general import read_image

CURRENT_PATH = root_path = Path(__file__).parents[0]


@registry.register_worker()
class HandImageInput(BaseWorker):
"""Hand image input processor.

It accepts either an image URL or local file path as input, reads the image,
and caches it in the workflow's short-term memory for use by downstream processors.

The processor gracefully handles cases where users choose not to provide an image or if there
are issues reading the provided image.

Attributes:
None - This worker uses only the base worker functionality
"""

def _run(self, *args, **kwargs):

user_input = self.input.read_input(workflow_instance_id=self.workflow_instance_id, input_prompt="Please enter the hand image while driving")

content = user_input['messages'][-1]['content']
for content_item in content:
if content_item['type'] == 'image_url':
image_path = content_item['data']

try:
img = read_image(input_source=image_path)
image_cache = {'<image_0>' : img}
self.stm(self.workflow_instance_id)['hand_image_cache'] = image_cache
except Exception as e:
pass

return

Empty file.
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
import json_repair
import re
from pathlib import Path
from typing import List
from pydantic import Field

from omagent_core.models.llms.base import BaseLLMBackend
from omagent_core.utils.registry import registry
from omagent_core.models.llms.prompt.prompt import PromptTemplate
from omagent_core.engine.worker.base import BaseWorker
from omagent_core.models.llms.prompt.parser import StrParser
from omagent_core.models.llms.openai_gpt import OpenaiGPTLLM
from omagent_core.utils.logger import logging


CURRENT_PATH = root_path = Path(__file__).parents[0]


@registry.register_worker()
class LoopDecider(BaseLLMBackend, BaseWorker):
"""Loop decider worker that determines if enough information is available to make a recommendation."""
llm: OpenaiGPTLLM
prompts: List[PromptTemplate] = Field(
default=[
PromptTemplate.from_file(
CURRENT_PATH.joinpath("sys_prompt.prompt"), role="system"
),
PromptTemplate.from_file(
CURRENT_PATH.joinpath("user_prompt.prompt"), role="user"
),
]
)

def _run(self, *args, **kwargs):

# Retrieve context data from short-term memory, using empty lists as defaults
if self.stm(self.workflow_instance_id).get("hand_image_cache"):
hand_image = self.stm(self.workflow_instance_id).get("hand_image_cache")
else:
hand_image = []

if self.stm(self.workflow_instance_id).get("face_image_cache"):
face_image = self.stm(self.workflow_instance_id).get("face_image_cache")
else:
face_image = []

if self.stm(self.workflow_instance_id).get("user_instruction"):
user_instruction = self.stm(self.workflow_instance_id).get("user_instruction")
else:
user_instruction = []

# Query LLM to analyze available information
print(self.llm)
chat_complete_res = self.simple_infer(
hand_image=hand_image.get("<image_0>"),
face_image=face_image.get("<image_0>"),
speed=str(user_instruction)
)
content = chat_complete_res["choices"][0]["message"].get("content")
content = self._extract_from_result(content)
logging.info(content)

# Return decision based on LLM output
if content.get("decision") == "no":
self.callback.send_answer(agent_id=self.workflow_instance_id, msg=content.get("reason"))
return {"decision": True}
else:
self.callback.send_answer(agent_id=self.workflow_instance_id, msg=content.get("reason"))
return {"decision": False}

def _extract_from_result(self, result: str) -> dict:
try:
pattern = r"```json\s+(.*?)\s+```"
match = re.search(pattern, result, re.DOTALL)
if match:
return json_repair.loads(match.group(1))
else:
return json_repair.loads(result)
except Exception as error:
raise ValueError("LLM generation is not valid.")
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
You are a helpful consultant assistant capable of gathering information to help users analyze data such as hand images, face images, and speed while driving in order to determine if a driver is involved in dangerous driving.

You will receive:

- User's hand image while driving
- User's face image while driving
- Speed of the vehicle

Your task is to analyze all the provided information and decide if there is enough detail to determine whether the driver is engaged in dangerous driving.

You should reply in the following format:
{
"decision": "yes or no",
"reason": "If yes, specify which particular reasons indicate dangerous driving and provide suggestions. If no, indicate that the driver has no dangerous driving behaviors."
}

In your assessment, consider:

1. The highest speed limit is 120 km/h.
2. Both hands should be on the steering wheel as much as possible; single-hand steering should only occur when shifting gears with the right hand.
3. The face should be attentive, not distracted or with eyes tightly closed, while driving.

Your response must be in valid JSON format. Specifically explain if there is missing information or why the collected information is sufficient.
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
Now, it's your turn to complete the task.

Input Information:
- User's hand image while driving: {{hand_image}}
- User's face image while driving: {{face_image}}
- Speed of the vehicle: {{speed}}
Empty file.
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
from pathlib import Path

from omagent_core.utils.registry import registry
from omagent_core.utils.general import read_image
from omagent_core.engine.worker.base import BaseWorker
from omagent_core.utils.logger import logging

CURRENT_PATH = Path(__file__).parents[0]


@registry.register_worker()
class InputInterface(BaseWorker):
"""Input interface processor that handles user instructions and image input.

This processor:
1. Reads user input containing question and image via input interface
2. Extracts text instruction and image path from the input
3. Loads and caches the image in workflow storage
4. Returns the user instruction for next steps
"""

def _run(self, *args, **kwargs):
# Read user input through configured input interface
user_input = self.input.read_input(workflow_instance_id=self.workflow_instance_id, input_prompt='Please enter the car speed while driving')

image_path = None
# Extract text and image content from input message
content = user_input['messages'][-1]['content']
for content_item in content:
if content_item['type'] == 'text':
user_instruction = content_item['data']

# return {'user_instruction': user_instruction}
try:
# img = read_image(input_source=image_path)
# image_cache = {'<image_0>' : img}
self.stm(self.workflow_instance_id)['user_instruction'] = user_instruction
except Exception as e:
pass
18 changes: 18 additions & 0 deletions examples/dangerous_driving_detection/compile_container.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
# Import core modules and components
from omagent_core.utils.container import container
from omagent_core.utils.registry import registry
from pathlib import Path

# Import all registered modules
registry.import_module()

CURRENT_PATH = Path(__file__).parents[0]


# Register required components
container.register_stm("RedisSTM")
container.register_callback(callback='AppCallback')
container.register_input(input='AppInput')

# Compile container config
container.compile_config(CURRENT_PATH)
6 changes: 6 additions & 0 deletions examples/dangerous_driving_detection/configs/llms/gpt.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
name: OpenaiGPTLLM
model_id: gpt-4o
api_key: ${env| custom_openai_key, openai_api_key}
endpoint: ${env| custom_openai_endpoint, https://api.openai.com/v1}
temperature: 0
vision: true
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
name: OpenaiGPTLLM
model_id: gpt-4o
api_key: ${env| custom_openai_key, openai_api_key}
endpoint: ${env| custom_openai_endpoint, https://api.openai.com/v1}
temperature: 0
vision: false
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
llm: ${sub| text_res}
tools:
- name: WebSearch
bing_api_key: ${env| bing_api_key, null}
llm: ${sub|text_res}
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
name: LoopDecider
llm: ${sub| gpt}
Loading