diff --git a/docs/concepts/client.md b/docs/concepts/client.md index 9e33c97..ac132b1 100644 --- a/docs/concepts/client.md +++ b/docs/concepts/client.md @@ -13,10 +13,17 @@ Currently, there are two clients: `DefaultClient` and `AppClient`. - Among them, either `interactor` or `processor` must be chosen to be passed in. `interactor` is the workflow used for interaction, and `processor` is the workflow used for image processing. - At least one of `config_path` and `workers` must be passed in, or both can be passed. `config_path` is the path to the worker configuration file, and `workers` is a list of `Worker` instances. +`WebpageClient` is a web page chat window implemented with gradio, which can be used for interaction. +- The parameters of `WebpageClient` include `interactor`, `processor`, `config_path`, and `workers`. +- Among them, either `interactor` or `processor` must be chosen to be passed in. +- `interactor` is the workflow used for interaction, with a default port of **7860** after startup, and the access address is `http://127.0.0.1:7860`. +- `processor` is the workflow used for image processing, with a default port of **7861** after startup, and the access address is `http://127.0.0.1:7861`. +- At least one of `config_path` and `workers` must be passed in, or both can be passed. `config_path` is the path to the worker configuration file, and `workers` is a list of `Worker` instances. + -The input for `DefaultClient` uses `AppInput`, and the output uses `DefaultCallback`. The input for `AppClient` uses `AppInput`, and the output uses `AppCallback`. +The input for `DefaultClient` uses `AppInput`, and the output uses `DefaultCallback`. The input for `AppClient` uses `AppInput`, and the output uses `AppCallback`. The input for `WebpageClient` uses `AppInput`, and the output uses `AppCallback`. -When writing an agent worker, you don't need to worry about which one to use. Simply call `self.input.read_input()` and `self.callback.send_xxx()`. Depending on whether `DefaultClient` or `AppClient` is instantiated, different input and output logic will be followed. +When writing an agent worker, you don't need to worry about which one to use. Simply call `self.input.read_input()` and `self.callback.send_xxx()`. Depending on whether `DefaultClient` or `AppClient` or `WebpageClient` is instantiated, different input and output logic will be followed. The input has only one method: - `read_input(workflow_instance_id: str, input_prompt = "")` diff --git a/docs/concepts/debug.md b/docs/concepts/debug.md new file mode 100644 index 0000000..2955029 --- /dev/null +++ b/docs/concepts/debug.md @@ -0,0 +1,5 @@ +Debug Mode: +Set `debug: true` in the `conductor_config` section within `container.yaml` to enable debug mode. The debug mode has the following features: +1. Outputs more debug information. +2. After starting, it will stop all workflows with the same name on the conductor and restart a new workflow. +3. There will be no retries after failure, each task will only be executed once. \ No newline at end of file diff --git a/examples/general_dnc/run_webpage.py b/examples/general_dnc/run_webpage.py new file mode 100644 index 0000000..708eb0f --- /dev/null +++ b/examples/general_dnc/run_webpage.py @@ -0,0 +1,73 @@ +# Import required modules and components +import os +from omagent_core.utils.container import container +from omagent_core.engine.workflow.conductor_workflow import ConductorWorkflow +from omagent_core.engine.workflow.task.simple_task import simple_task +from pathlib import Path +from omagent_core.utils.registry import registry +from omagent_core.clients.devices.webpage.client import WebpageClient +from omagent_core.utils.logger import logging +from omagent_core.engine.workflow.task.set_variable_task import SetVariableTask +from omagent_core.engine.workflow.task.switch_task import SwitchTask +from omagent_core.engine.workflow.task.do_while_task import DnCLoopTask +from agent.input_interface.dnc_input_interface import DnCInputIterface +logging.init_logger("omagent", "omagent", level="INFO") + + +# Set current working directory path +CURRENT_PATH = root_path = Path(__file__).parents[0] + +# Import registered modules +registry.import_module() + +# Load container configuration from YAML file +container.register_stm("RedisSTM") +container.from_config(CURRENT_PATH.joinpath('container.yaml')) + +# Initialize simple VQA workflow +workflow = ConductorWorkflow(name='general_dnc') + +# Configure workflow tasks: +# 1. Input interface for user interaction +client_input_task = simple_task(task_def_name='DnCInputIterface', task_reference_name='dnc_input_interface') + +# 2. Initialize set variable task for global workflow variables +init_set_variable_task = SetVariableTask(task_ref_name='set_variable_task', input_parameters={'agent_task': client_input_task.output('agent_task'), 'last_output': client_input_task.output('last_output')}) + +# 3. Conqueror task for task generation and update global workflow variables +conqueror_task = simple_task(task_def_name='TaskConqueror', task_reference_name='task_conqueror', inputs={'agent_task': '${workflow.variables.agent_task}', 'last_output': '${workflow.variables.last_output}'}) +conqueror_update_set_variable_task = SetVariableTask(task_ref_name='conqueror_update_set_variable_task', input_parameters={'agent_task': '${task_conqueror.output.agent_task}', 'last_output': '${task_conqueror.output.last_output}'}) + +# 4. Divider task for task division and update global workflow variables +divider_task = simple_task(task_def_name='TaskDivider', task_reference_name='task_divider', inputs={'agent_task': conqueror_task.output('agent_task'), 'last_output': conqueror_task.output('last_output')}) +divider_update_set_variable_task = SetVariableTask(task_ref_name='divider_update_set_variable_task', input_parameters={'agent_task': '${task_divider.output.agent_task}', 'last_output': '${task_divider.output.last_output}'}) + +# 5. Rescue task for task rescue +rescue_task = simple_task(task_def_name='TaskRescue', task_reference_name='task_rescue', inputs={'agent_task': conqueror_task.output('agent_task'), 'last_output': conqueror_task.output('last_output')}) + +# 6. Conclude task for task conclusion +conclude_task = simple_task(task_def_name='Conclude', task_reference_name='task_conclude', inputs={'agent_task': '${task_exit_monitor.output.agent_task}', 'last_output': '${task_exit_monitor.output.last_output}'}) + +# 7. Switch task for task routing +switch_task = SwitchTask(task_ref_name='switch_task', case_expression=conqueror_task.output('switch_case_value')) +switch_task.default_case([conqueror_update_set_variable_task]) +switch_task.switch_case("complex", [divider_task, divider_update_set_variable_task]) +switch_task.switch_case("failed", rescue_task) + +# 8. Task exit monitor task for task exit monitoring and update global workflow variables +task_exit_monitor_task = simple_task(task_def_name='TaskExitMonitor', task_reference_name='task_exit_monitor', inputs={'agent_task': '${workflow.variables.agent_task}', 'last_output': '${workflow.variables.last_output}'}) +post_set_variable_task = SetVariableTask(task_ref_name='post_set_variable_task', input_parameters={'agent_task': '${task_exit_monitor.output.agent_task}', 'last_output': '${task_exit_monitor.output.last_output}'}) + +# 9. DnC loop task for task loop +dncloop_task = DnCLoopTask(task_ref_name='dncloop_task', tasks=[conqueror_task, switch_task], post_loop_exit=[task_exit_monitor_task, post_set_variable_task]) + +# Configure workflow execution flow: Input -> Initialize global variables -> DnC Loop -> Conclude +workflow >> client_input_task >> init_set_variable_task >> dncloop_task >> conclude_task + +# Register workflow +workflow.register(overwrite=True) + +# Initialize and start app client with workflow configuration +config_path = CURRENT_PATH.joinpath('configs') +agent_client = WebpageClient(interactor=workflow, config_path=config_path, workers=[DnCInputIterface()]) +agent_client.start_interactor() diff --git a/examples/step1_simpleVQA/run_webpage.py b/examples/step1_simpleVQA/run_webpage.py new file mode 100644 index 0000000..4d82b84 --- /dev/null +++ b/examples/step1_simpleVQA/run_webpage.py @@ -0,0 +1,44 @@ +# Import required modules and components +from omagent_core.utils.container import container +from omagent_core.engine.workflow.conductor_workflow import ConductorWorkflow +from omagent_core.engine.workflow.task.simple_task import simple_task +from pathlib import Path +from omagent_core.utils.registry import registry +from omagent_core.clients.devices.webpage.client import WebpageClient +from omagent_core.utils.logger import logging +logging.init_logger("omagent", "omagent", level="INFO") + +# Import agent-specific components +from agent.input_interface.input_interface import InputInterface + +# Set current working directory path +CURRENT_PATH = root_path = Path(__file__).parents[0] + +# Import registered modules +registry.import_module(project_path=CURRENT_PATH.joinpath('agent')) + +container.register_stm("RedisSTM") +# Load container configuration from YAML file +container.from_config(CURRENT_PATH.joinpath('container.yaml')) + + + +# Initialize simple VQA workflow +workflow = ConductorWorkflow(name='step1_simpleVQA') + +# Configure workflow tasks: +# 1. Input interface for user interaction +task1 = simple_task(task_def_name='InputInterface', task_reference_name='input_task') +# 2. Simple VQA processing based on user input +task2 = simple_task(task_def_name='SimpleVQA', task_reference_name='simple_vqa', inputs={'user_instruction': task1.output('user_instruction')}) + +# Configure workflow execution flow: Input -> VQA +workflow >> task1 >> task2 + +# Register workflow +workflow.register(True) + +# Initialize and start app client with workflow configuration +config_path = CURRENT_PATH.joinpath('configs') +agent_client = WebpageClient(interactor=workflow, config_path=config_path, workers=[InputInterface()]) +agent_client.start_interactor() diff --git a/examples/step2_outfit_with_switch/run_webpage.py b/examples/step2_outfit_with_switch/run_webpage.py new file mode 100644 index 0000000..87dd5cc --- /dev/null +++ b/examples/step2_outfit_with_switch/run_webpage.py @@ -0,0 +1,59 @@ +# Import required modules from omagent_core +from omagent_core.utils.container import container # For dependency injection container +from omagent_core.engine.workflow.conductor_workflow import ConductorWorkflow # For workflow management +from omagent_core.engine.workflow.task.simple_task import simple_task # For defining workflow tasks +from omagent_core.utils.registry import registry # For registering components +from omagent_core.clients.devices.webpage.client import WebpageClient # For webpage client interface +from omagent_core.utils.logger import logging # For logging functionality +logging.init_logger("omagent", "omagent", level="INFO") # Initialize logger + +# Set up path configuration +from pathlib import Path +CURRENT_PATH = Path(__file__).parents[0] # Get current directory path +# Import agent modules +registry.import_module(project_path=CURRENT_PATH.joinpath('agent')) + +# Add parent directory to Python path for imports +import sys +import os +sys.path.append(os.path.abspath(CURRENT_PATH.joinpath('../../'))) + +# Import input interface from previous example +from examples.step1_simpleVQA.agent.input_interface.input_interface import InputInterface + + +# Load container configuration from YAML file +# This configures dependencies like Redis connections and API endpoints +container.register_stm("RedisSTM") +container.from_config(CURRENT_PATH.joinpath('container.yaml')) + + +# Initialize outfit recommendation workflow with unique name +# This workflow will handle the outfit recommendation process +workflow = ConductorWorkflow(name='step2_outfit_with_switch') + +# Configure workflow tasks: +# 1. Input interface task to get user's clothing request +task1 = simple_task(task_def_name='InputInterface', task_reference_name='input_task') +# 2. Weather decision task to determine if weather info is needed +task2 = simple_task(task_def_name='WeatherDecider', task_reference_name='weather_decider', inputs={'user_instruction': task1.output('user_instruction')}) +# 3. Weather search task to fetch current weather conditions if needed +task3 = simple_task(task_def_name='WeatherSearcher', task_reference_name='weather_searcher', inputs={'user_instruction': task1.output('user_instruction')}) +# 4. Outfit recommendation task to generate final clothing suggestions +task4 = simple_task(task_def_name='OutfitRecommendation', task_reference_name='outfit_recommendation') + +# Configure workflow execution flow: +# The workflow follows this sequence: +# 1. Get user input +# 2. Analyze if weather info is needed +# 3. Conditionally fetch weather (only if decision task returns 0) +# 4. Generate outfit recommendations based on all gathered info +workflow >> task1 >> task2 >> {0 : task3} >> task4 + +# Register workflow +workflow.register(True) + +# Initialize and start app client with workflow configuration +config_path = CURRENT_PATH.joinpath('configs') +agent_client = WebpageClient(interactor=workflow, config_path=config_path, workers=[InputInterface()]) +agent_client.start_interactor() diff --git a/examples/step3_outfit_with_loop/run_webpage.py b/examples/step3_outfit_with_loop/run_webpage.py new file mode 100644 index 0000000..c00df1f --- /dev/null +++ b/examples/step3_outfit_with_loop/run_webpage.py @@ -0,0 +1,65 @@ +# Import core OmAgent components for workflow management and app functionality +from omagent_core.utils.container import container +from omagent_core.engine.workflow.conductor_workflow import ConductorWorkflow +from omagent_core.engine.workflow.task.simple_task import simple_task +from omagent_core.clients.devices.webpage.client import WebpageClient +from omagent_core.utils.logger import logging +logging.init_logger("omagent", "omagent", level="INFO") + +from omagent_core.utils.registry import registry + +from pathlib import Path +CURRENT_PATH = Path(__file__).parents[0] + +# Import and register worker modules from agent directory +registry.import_module(project_path=CURRENT_PATH.joinpath('agent')) + +# Add parent directory to Python path for imports +import sys +import os +sys.path.append(os.path.abspath(CURRENT_PATH.joinpath('../../'))) + +# Import custom image input worker +from agent.outfit_image_input.outfit_image_input import OutfitImageInput +from examples.step2_outfit_with_switch.agent.outfit_recommendation.outfit_recommendation import OutfitRecommendation + +# Import task type for implementing loops in workflow +from omagent_core.engine.workflow.task.do_while_task import DoWhileTask + + +# Configure container with Redis storage and load settings +container.register_stm("RedisSTM") +container.from_config(CURRENT_PATH.joinpath('container.yaml')) + + +# Initialize workflow for outfit recommendations with loops +workflow = ConductorWorkflow(name='step3_outfit_with_loop') + +# Define workflow tasks: +# 1. Get initial outfit image input +task1 = simple_task(task_def_name='OutfitImageInput', task_reference_name='image_input') + +# 2. Ask questions about the outfit +task2 = simple_task(task_def_name='OutfitQA', task_reference_name='outfit_qa') + +# 3. Decide if enough information is gathered +task3 = simple_task(task_def_name='OutfitDecider', task_reference_name='outfit_decider') + +# 4. Generate final outfit recommendations +task4 = simple_task(task_def_name='OutfitRecommendation', task_reference_name='outfit_recommendation') + +# Create loop that continues Q&A until sufficient information is gathered +# Loop terminates when outfit_decider returns decision=true +outfit_qa_loop = DoWhileTask(task_ref_name='outfit_loop', tasks=[task2, task3], + termination_condition='if ($.outfit_decider["decision"] == true){false;} else {true;} ') + +# Define workflow sequence: image input -> Q&A loop -> final recommendation +workflow >> task1 >> outfit_qa_loop >> task4 + +# Register workflow with conductor server +workflow.register(True) + +# Initialize and start app client with workflow and image input worker +config_path = CURRENT_PATH.joinpath('configs') +agent_client = WebpageClient(interactor=workflow, config_path=config_path, workers=[OutfitImageInput()]) +agent_client.start_interactor() diff --git a/examples/step4_outfit_with_ltm/image_storage/run_image_storage_webpage.py b/examples/step4_outfit_with_ltm/image_storage/run_image_storage_webpage.py new file mode 100644 index 0000000..fdbd164 --- /dev/null +++ b/examples/step4_outfit_with_ltm/image_storage/run_image_storage_webpage.py @@ -0,0 +1,43 @@ +# Import required modules and components +from omagent_core.utils.container import container +from omagent_core.engine.workflow.conductor_workflow import ConductorWorkflow +from omagent_core.engine.workflow.task.simple_task import simple_task +from omagent_core.utils.logger import logging +logging.init_logger("omagent", "omagent", level="INFO") + +# Set up paths and registry +from pathlib import Path +CURRENT_PATH = Path(__file__).parents[0] + +from omagent_core.utils.registry import registry +registry.import_module(project_path=CURRENT_PATH.joinpath('agent')) + +# Import app-specific components +from omagent_core.clients.devices.webpage.client import WebpageClient +from omagent_core.clients.devices.app.image_index import ImageIndexListener + +# Configure container with storage systems +container.register_stm(stm='RedisSTM') +container.register_ltm(ltm='MilvusLTM') +container.from_config(CURRENT_PATH.joinpath('container.yaml')) + +# Initialize image storage workflow +workflow = ConductorWorkflow(name='step4_image_storage') + +# Configure workflow tasks: +# 1. Listen for new images +task1 = simple_task(task_def_name='ImageIndexListener', task_reference_name='image_index_listener') + +# 2. Preprocess images for storage +task2 = simple_task(task_def_name='OutfitImagePreprocessor', task_reference_name='outfit_image_preprocessor', inputs={'image_data': task1.output('output')}) + +# Configure workflow execution flow: Listen -> Preprocess +workflow >> task1 >> task2 + +# Register workflow +workflow.register(True) + +# Initialize and start app client with workflow configuration +config_path = CURRENT_PATH.joinpath('configs') +agent_client = WebpageClient(processor=workflow, config_path=config_path, workers=[ImageIndexListener()]) +agent_client.start_processor() diff --git a/examples/step4_outfit_with_ltm/outfit_from_storage/run_outfit_recommendation_webpage.py b/examples/step4_outfit_with_ltm/outfit_from_storage/run_outfit_recommendation_webpage.py new file mode 100644 index 0000000..3023730 --- /dev/null +++ b/examples/step4_outfit_with_ltm/outfit_from_storage/run_outfit_recommendation_webpage.py @@ -0,0 +1,58 @@ +from omagent_core.utils.container import container +from omagent_core.engine.workflow.conductor_workflow import ConductorWorkflow +from omagent_core.engine.workflow.task.simple_task import simple_task +from omagent_core.utils.logger import logging +logging.init_logger("omagent", "omagent", level="INFO") + +from omagent_core.clients.devices.webpage.client import WebpageClient + +from pathlib import Path +CURRENT_PATH = Path(__file__).parents[0] + +from omagent_core.utils.registry import registry +registry.import_module(project_path=CURRENT_PATH.joinpath('agent')) + +import sys +import os +sys.path.append(os.path.abspath(CURRENT_PATH.joinpath('../../../'))) + +from omagent_core.engine.workflow.task.do_while_task import DoWhileTask +from examples.step3_outfit_with_loop.agent.outfit_decider.outfit_decider import OutfitDecider +from examples.step3_outfit_with_loop.agent.outfit_qa.outfit_qa import OutfitQA + +# Register storage components +container.register_stm(stm='RedisSTM') +container.register_ltm(ltm='MilvusLTM') + +# Load container configuration +container.from_config(CURRENT_PATH.joinpath('container.yaml')) + +# Create main workflow +workflow = ConductorWorkflow(name='step4_outfit_recommendation') + +# Initialize QA task to gather user preferences +task1 = simple_task(task_def_name='OutfitQA', task_reference_name='outfit_qa') + +# Initialize decision task to evaluate QA responses +task2 = simple_task(task_def_name='OutfitDecider', task_reference_name='outfit_decider') + +# Initialize outfit generation task +task3 = simple_task(task_def_name='OutfitGeneration', task_reference_name='outfit_generation') + +# Initialize conclusion task with outfit generation output +task4 = simple_task(task_def_name='OutfitConclusion', task_reference_name='outfit_conclusion', inputs={'proposed_outfit': task3.output('proposed_outfit')}) + +# Create iterative QA loop that continues until a positive decision is reached +outfit_qa_loop = DoWhileTask(task_ref_name='outfit_loop', tasks=[task1, task2], + termination_condition='if ($.outfit_decider["decision"] == true){false;} else {true;} ') + +# Define workflow sequence +workflow >> outfit_qa_loop >> task3 >> task4 + +# Register workflow with conductor +workflow.register(True) + +# Initialize and run CLI client +config_path = CURRENT_PATH.joinpath('configs') +cli_client = WebpageClient(interactor=workflow, config_path=config_path) +cli_client.start_interactor() diff --git a/omagent-core/pyproject.toml b/omagent-core/pyproject.toml index e025e77..2f32f45 100644 --- a/omagent-core/pyproject.toml +++ b/omagent-core/pyproject.toml @@ -49,7 +49,8 @@ dependencies = [ "json_repair >= 0.30.1", "scenedetect >= 0.6.4", "pydub >= 0.25.1", - "face_recognition >= 1.3.0" + "face_recognition >= 1.3.0", + "gradio >= 5.7.1" ] [project.urls] diff --git a/omagent-core/src/omagent_core/clients/devices/app/callback.py b/omagent-core/src/omagent_core/clients/devices/app/callback.py index 2f7dbda..2ecee16 100644 --- a/omagent-core/src/omagent_core/clients/devices/app/callback.py +++ b/omagent-core/src/omagent_core/clients/devices/app/callback.py @@ -53,7 +53,7 @@ def send_to_group(self, stream_name, group_name, data): stream_name, group_name, id="0" ) except Exception as e: - logging.info(f"Consumer group may already exist: {e}") + logging.debug(f"Consumer group may already exist: {e}") def send_base_message( self, diff --git a/omagent-core/src/omagent_core/clients/devices/app/image_index.py b/omagent-core/src/omagent_core/clients/devices/app/image_index.py index 536aaeb..0083538 100644 --- a/omagent-core/src/omagent_core/clients/devices/app/image_index.py +++ b/omagent-core/src/omagent_core/clients/devices/app/image_index.py @@ -28,7 +28,7 @@ def _run(self): stream_name, group_name, id="0", mkstream=True ) except Exception as e: - logging.info(f"Consumer group may already exist: {e}") + logging.debug(f"Consumer group may already exist: {e}") logging.info(f"Listening to Redis stream: {stream_name} in group: {group_name}") flag = False diff --git a/omagent-core/src/omagent_core/clients/devices/app/input.py b/omagent-core/src/omagent_core/clients/devices/app/input.py index 89d11b7..5365135 100644 --- a/omagent-core/src/omagent_core/clients/devices/app/input.py +++ b/omagent-core/src/omagent_core/clients/devices/app/input.py @@ -39,7 +39,7 @@ def read_input(self, workflow_instance_id: str, input_prompt = ""): stream_name, group_name, id="0", mkstream=True ) except Exception as e: - logging.info(f"Consumer group may already exist: {e}") + logging.debug(f"Consumer group may already exist: {e}") logging.info(f"Listening to Redis stream: {stream_name} in group: {group_name} start_id: {start_id}") data_flag = False @@ -196,7 +196,7 @@ def _send_to_group(self, stream_name, group_name, data): stream_name, group_name, id="0" ) except Exception as e: - logging.info(f"Consumer group may already exist: {e}") + logging.debug(f"Consumer group may already exist: {e}") def _send_base_message( self, diff --git a/omagent-core/src/omagent_core/clients/devices/cli/client.py b/omagent-core/src/omagent_core/clients/devices/cli/client.py index 04a6c5e..1f21e4a 100644 --- a/omagent-core/src/omagent_core/clients/devices/cli/client.py +++ b/omagent-core/src/omagent_core/clients/devices/cli/client.py @@ -63,7 +63,7 @@ def start_interactor(self): stream_name, group_name, id="0", mkstream=True ) except Exception as e: - logging.info(f"Consumer group may already exist: {e}") + logging.debug(f"Consumer group may already exist: {e}") while True: diff --git a/omagent-core/src/omagent_core/clients/devices/webpage/client.py b/omagent-core/src/omagent_core/clients/devices/webpage/client.py new file mode 100644 index 0000000..28e09fa --- /dev/null +++ b/omagent-core/src/omagent_core/clients/devices/webpage/client.py @@ -0,0 +1,224 @@ +from time import sleep +import gradio as gr +import json +from omagent_core.clients.devices.app.schemas import ContentStatus, MessageType +from omagent_core.utils.container import container +from omagent_core.engine.workflow.conductor_workflow import ConductorWorkflow +from omagent_core.utils.build import build_from_file +from omagent_core.engine.automator.task_handler import TaskHandler +from omagent_core.clients.devices.app.callback import AppCallback +from omagent_core.clients.devices.app.input import AppInput +import yaml +from omagent_core.utils.container import container +from omagent_core.utils.registry import registry +from omagent_core.services.connectors.redis import RedisConnector +from omagent_core.utils.logger import logging + +registry.import_module() + +container.register_connector(name='redis_stream_client', connector=RedisConnector) +# container.register_stm(stm='RedisSTM') +container.register_callback(callback=AppCallback) +container.register_input(input=AppInput) + + +class WebpageClient: + def __init__( + self, + interactor: ConductorWorkflow = None, + processor: ConductorWorkflow = None, + config_path: str = "./config", + workers: list = [], + ) -> None: + self._interactor = interactor + self._processor = processor + self._config_path = config_path + self._workers = workers + self._workflow_instance_id = None + + def start_interactor(self): + worker_config = build_from_file(self._config_path) + self._task_handler_interactor = TaskHandler(worker_config=worker_config, workers=self._workers) + self._task_handler_interactor.start_processes() + with gr.Blocks(title="OmAgent") as chat_interface: + chatbot = gr.Chatbot(elem_id="OmAgent", bubble_full_width=False, type="messages") + + chat_input = gr.MultimodalTextbox( + interactive=True, + file_count="multiple", + placeholder="Enter message or upload file...", + show_label=False, + ) + + chat_msg = chat_input.submit( + self.add_message, [chatbot, chat_input], [chatbot, chat_input] + ) + bot_msg = chat_msg.then(self.bot, chatbot, chatbot, api_name="bot_response") + bot_msg.then(lambda: gr.MultimodalTextbox(interactive=True), None, [chat_input]) + chat_interface.launch() + + def stop_interactor(self): + self._task_handler_interactor.stop_processes() + + def start_processor(self): + worker_config = build_from_file(self._config_path) + self._task_handler_processor = TaskHandler(worker_config=worker_config, workers=self._workers) + self._task_handler_processor.start_processes() + + with gr.Blocks(title="OmAgent") as chat_interface: + chatbot = gr.Chatbot(elem_id="OmAgent", bubble_full_width=False, type="messages") + + chat_input = gr.MultimodalTextbox( + interactive=True, + file_count="multiple", + placeholder="Enter message or upload file...", + show_label=False, + ) + + chat_msg = chat_input.submit( + self.add_processor_message, [chatbot, chat_input], [chatbot, chat_input] + ) + bot_msg = chat_msg.then(self.processor_bot, chatbot, chatbot, api_name="bot_response") + bot_msg.then(lambda: gr.MultimodalTextbox(interactive=True), None, [chat_input]) + chat_interface.launch(server_port=7861) + + def stop_processor(self): + self._task_handler_processor.stop_processes() + + def add_message(self, history, message): + if self._workflow_instance_id is None: + self._workflow_instance_id = self._interactor.start_workflow_with_input(workflow_input={}) + contents = [] + for x in message["files"]: + history.append({"role": "user", "content": {"path": x}}) + contents.append({"type": "image_url","data": x}) + if message["text"] is not None: + history.append({"role": "user", "content": message["text"]}) + contents.append({"type": "text","data": message["text"]}) + result = { + "agent_id": self._workflow_instance_id, + "messages": [ + { + "role": "user", + "content": contents + } + ], + "kwargs": {} + } + container.get_connector('redis_stream_client')._client.xadd(f"{self._workflow_instance_id}_input", {"payload":json.dumps(result, ensure_ascii=False) }) + return history, gr.MultimodalTextbox(value=None, interactive=False) + + def add_processor_message(self, history, message): + if self._workflow_instance_id is None: + self._workflow_instance_id = self._processor.start_workflow_with_input(workflow_input={}) + image_items = [] + for idx, x in enumerate(message["files"]): + history.append({"role": "user", "content": {"path": x}}) + image_items.append({ + "type": "image_url", + "resource_id": str(idx), + "data": str(x) + }) + result = { + "content": image_items + } + container.get_connector('redis_stream_client')._client.xadd(f"image_process", {"payload":json.dumps(result, ensure_ascii=False) }) + return history, gr.MultimodalTextbox(value=None, interactive=False) + + def bot(self, history: list): + stream_name = f"{self._workflow_instance_id}_output" + consumer_name = f"{self._workflow_instance_id}_agent" # consumer name + group_name = "omappagent" # replace with your consumer group name + running_stream_name = f"{self._workflow_instance_id}_running" + self._check_redis_stream_exist(stream_name, group_name) + self._check_redis_stream_exist(running_stream_name, group_name) + while True: + # read running stream + running_messages = self._get_redis_stream_message(group_name, consumer_name, running_stream_name) + for stream, message_list in running_messages: + for message_id, message in message_list: + payload_data = self._get_message_payload(message) + if payload_data is None: + continue + progress = payload_data.get("progress") + message = payload_data.get("message") + history.append({"role": "assistant", "content": f"`{progress}: {message}`"}) + yield history + + container.get_connector('redis_stream_client')._client.xack( + running_stream_name, group_name, message_id + ) + # read output stream + messages = self._get_redis_stream_message(group_name, consumer_name, stream_name) + finish_flag = False + for stream, message_list in messages: + for message_id, message in message_list: + payload_data = self._get_message_payload(message) + if payload_data is None: + continue + message_item = payload_data["message"] + if message_item["type"] == MessageType.IMAGE_URL.value: + history.append({"role": "assistant", "content": {"path": message_item["content"]}}) + else: + history.append({"role": "assistant", "content": message_item["content"]}) + + yield history + + container.get_connector('redis_stream_client')._client.xack( + stream_name, group_name, message_id + ) + + # check finish flag + if "interaction_type" in payload_data and payload_data["interaction_type"] == 1: + finish_flag = True + if "content_status" in payload_data and payload_data["content_status"] == ContentStatus.END_ANSWER.value: + self._workflow_instance_id = None + finish_flag = True + + if finish_flag: + break + sleep(1) + + def processor_bot(self, history: list): + history.append({"role": "assistant", "content": f"`processing...`"}) + yield history + while True: + status = self._processor.get_workflow(workflow_id=self._workflow_instance_id).status + if status == 'COMPLETED': + history.append({"role": "assistant", "content": f"`completed`"}) + yield history + self._workflow_instance_id = None + break + sleep(1) + + def _get_redis_stream_message(self, group_name: str, consumer_name: str, stream_name: str): + messages = container.get_connector('redis_stream_client')._client.xreadgroup( + group_name, consumer_name, {stream_name: ">"}, count=1 + ) + messages = [ + (stream, [(message_id, {k.decode('utf-8'): v.decode('utf-8') for k, v in message.items()}) for message_id, message in message_list]) + for stream, message_list in messages + ] + return messages + + def _check_redis_stream_exist(self, stream_name: str, group_name: str): + try: + container.get_connector('redis_stream_client')._client.xgroup_create( + stream_name, group_name, id="0", mkstream=True + ) + except Exception as e: + logging.debug(f"Consumer group may already exist: {e}") + + def _get_message_payload(self, message: dict): + logging.info(f"Received running message: {message}") + payload = message.get("payload") + # check payload data + if not payload: + logging.error("Payload is empty") + return None + try: + payload_data = json.loads(payload) + except json.JSONDecodeError as e: + logging.error(f"Payload is not a valid JSON: {e}") + return None + return payload_data