Skip to content

Commit

Permalink
add webpage client
Browse files Browse the repository at this point in the history
add webpage client and update client docs
  • Loading branch information
djwu563 committed Dec 3, 2024
1 parent bd96f6c commit cd2fe79
Show file tree
Hide file tree
Showing 14 changed files with 587 additions and 8 deletions.
11 changes: 9 additions & 2 deletions docs/concepts/client.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,17 @@ Currently, there are two clients: `DefaultClient` and `AppClient`.
- Among them, either `interactor` or `processor` must be chosen to be passed in. `interactor` is the workflow used for interaction, and `processor` is the workflow used for image processing.
- At least one of `config_path` and `workers` must be passed in, or both can be passed. `config_path` is the path to the worker configuration file, and `workers` is a list of `Worker` instances.

`WebpageClient` is a web page chat window implemented with gradio, which can be used for interaction.
- The parameters of `WebpageClient` include `interactor`, `processor`, `config_path`, and `workers`.
- Among them, either `interactor` or `processor` must be chosen to be passed in.
- `interactor` is the workflow used for interaction, with a default port of **7860** after startup, and the access address is `http://127.0.0.1:7860`.
- `processor` is the workflow used for image processing, with a default port of **7861** after startup, and the access address is `http://127.0.0.1:7861`.
- At least one of `config_path` and `workers` must be passed in, or both can be passed. `config_path` is the path to the worker configuration file, and `workers` is a list of `Worker` instances.


The input for `DefaultClient` uses `AppInput`, and the output uses `DefaultCallback`. The input for `AppClient` uses `AppInput`, and the output uses `AppCallback`.
The input for `DefaultClient` uses `AppInput`, and the output uses `DefaultCallback`. The input for `AppClient` uses `AppInput`, and the output uses `AppCallback`. The input for `WebpageClient` uses `AppInput`, and the output uses `AppCallback`.

When writing an agent worker, you don't need to worry about which one to use. Simply call `self.input.read_input()` and `self.callback.send_xxx()`. Depending on whether `DefaultClient` or `AppClient` is instantiated, different input and output logic will be followed.
When writing an agent worker, you don't need to worry about which one to use. Simply call `self.input.read_input()` and `self.callback.send_xxx()`. Depending on whether `DefaultClient` or `AppClient` or `WebpageClient` is instantiated, different input and output logic will be followed.

The input has only one method:
- `read_input(workflow_instance_id: str, input_prompt = "")`
Expand Down
5 changes: 5 additions & 0 deletions docs/concepts/debug.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
Debug Mode:
Set `debug: true` in the `conductor_config` section within `container.yaml` to enable debug mode. The debug mode has the following features:
1. Outputs more debug information.
2. After starting, it will stop all workflows with the same name on the conductor and restart a new workflow.
3. There will be no retries after failure, each task will only be executed once.
73 changes: 73 additions & 0 deletions examples/general_dnc/run_webpage.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
# Import required modules and components
import os
from omagent_core.utils.container import container
from omagent_core.engine.workflow.conductor_workflow import ConductorWorkflow
from omagent_core.engine.workflow.task.simple_task import simple_task
from pathlib import Path
from omagent_core.utils.registry import registry
from omagent_core.clients.devices.webpage.client import WebpageClient
from omagent_core.utils.logger import logging
from omagent_core.engine.workflow.task.set_variable_task import SetVariableTask
from omagent_core.engine.workflow.task.switch_task import SwitchTask
from omagent_core.engine.workflow.task.do_while_task import DnCLoopTask
from agent.input_interface.dnc_input_interface import DnCInputIterface
logging.init_logger("omagent", "omagent", level="INFO")


# Set current working directory path
CURRENT_PATH = root_path = Path(__file__).parents[0]

# Import registered modules
registry.import_module()

# Load container configuration from YAML file
container.register_stm("RedisSTM")
container.from_config(CURRENT_PATH.joinpath('container.yaml'))

# Initialize simple VQA workflow
workflow = ConductorWorkflow(name='general_dnc')

# Configure workflow tasks:
# 1. Input interface for user interaction
client_input_task = simple_task(task_def_name='DnCInputIterface', task_reference_name='dnc_input_interface')

# 2. Initialize set variable task for global workflow variables
init_set_variable_task = SetVariableTask(task_ref_name='set_variable_task', input_parameters={'agent_task': client_input_task.output('agent_task'), 'last_output': client_input_task.output('last_output')})

# 3. Conqueror task for task generation and update global workflow variables
conqueror_task = simple_task(task_def_name='TaskConqueror', task_reference_name='task_conqueror', inputs={'agent_task': '${workflow.variables.agent_task}', 'last_output': '${workflow.variables.last_output}'})
conqueror_update_set_variable_task = SetVariableTask(task_ref_name='conqueror_update_set_variable_task', input_parameters={'agent_task': '${task_conqueror.output.agent_task}', 'last_output': '${task_conqueror.output.last_output}'})

# 4. Divider task for task division and update global workflow variables
divider_task = simple_task(task_def_name='TaskDivider', task_reference_name='task_divider', inputs={'agent_task': conqueror_task.output('agent_task'), 'last_output': conqueror_task.output('last_output')})
divider_update_set_variable_task = SetVariableTask(task_ref_name='divider_update_set_variable_task', input_parameters={'agent_task': '${task_divider.output.agent_task}', 'last_output': '${task_divider.output.last_output}'})

# 5. Rescue task for task rescue
rescue_task = simple_task(task_def_name='TaskRescue', task_reference_name='task_rescue', inputs={'agent_task': conqueror_task.output('agent_task'), 'last_output': conqueror_task.output('last_output')})

# 6. Conclude task for task conclusion
conclude_task = simple_task(task_def_name='Conclude', task_reference_name='task_conclude', inputs={'agent_task': '${task_exit_monitor.output.agent_task}', 'last_output': '${task_exit_monitor.output.last_output}'})

# 7. Switch task for task routing
switch_task = SwitchTask(task_ref_name='switch_task', case_expression=conqueror_task.output('switch_case_value'))
switch_task.default_case([conqueror_update_set_variable_task])
switch_task.switch_case("complex", [divider_task, divider_update_set_variable_task])
switch_task.switch_case("failed", rescue_task)

# 8. Task exit monitor task for task exit monitoring and update global workflow variables
task_exit_monitor_task = simple_task(task_def_name='TaskExitMonitor', task_reference_name='task_exit_monitor', inputs={'agent_task': '${workflow.variables.agent_task}', 'last_output': '${workflow.variables.last_output}'})
post_set_variable_task = SetVariableTask(task_ref_name='post_set_variable_task', input_parameters={'agent_task': '${task_exit_monitor.output.agent_task}', 'last_output': '${task_exit_monitor.output.last_output}'})

# 9. DnC loop task for task loop
dncloop_task = DnCLoopTask(task_ref_name='dncloop_task', tasks=[conqueror_task, switch_task], post_loop_exit=[task_exit_monitor_task, post_set_variable_task])

# Configure workflow execution flow: Input -> Initialize global variables -> DnC Loop -> Conclude
workflow >> client_input_task >> init_set_variable_task >> dncloop_task >> conclude_task

# Register workflow
workflow.register(overwrite=True)

# Initialize and start app client with workflow configuration
config_path = CURRENT_PATH.joinpath('configs')
agent_client = WebpageClient(interactor=workflow, config_path=config_path, workers=[DnCInputIterface()])
agent_client.start_interactor()
44 changes: 44 additions & 0 deletions examples/step1_simpleVQA/run_webpage.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
# Import required modules and components
from omagent_core.utils.container import container
from omagent_core.engine.workflow.conductor_workflow import ConductorWorkflow
from omagent_core.engine.workflow.task.simple_task import simple_task
from pathlib import Path
from omagent_core.utils.registry import registry
from omagent_core.clients.devices.webpage.client import WebpageClient
from omagent_core.utils.logger import logging
logging.init_logger("omagent", "omagent", level="INFO")

# Import agent-specific components
from agent.input_interface.input_interface import InputInterface

# Set current working directory path
CURRENT_PATH = root_path = Path(__file__).parents[0]

# Import registered modules
registry.import_module(project_path=CURRENT_PATH.joinpath('agent'))

container.register_stm("RedisSTM")
# Load container configuration from YAML file
container.from_config(CURRENT_PATH.joinpath('container.yaml'))



# Initialize simple VQA workflow
workflow = ConductorWorkflow(name='step1_simpleVQA')

# Configure workflow tasks:
# 1. Input interface for user interaction
task1 = simple_task(task_def_name='InputInterface', task_reference_name='input_task')
# 2. Simple VQA processing based on user input
task2 = simple_task(task_def_name='SimpleVQA', task_reference_name='simple_vqa', inputs={'user_instruction': task1.output('user_instruction')})

# Configure workflow execution flow: Input -> VQA
workflow >> task1 >> task2

# Register workflow
workflow.register(True)

# Initialize and start app client with workflow configuration
config_path = CURRENT_PATH.joinpath('configs')
agent_client = WebpageClient(interactor=workflow, config_path=config_path, workers=[InputInterface()])
agent_client.start_interactor()
59 changes: 59 additions & 0 deletions examples/step2_outfit_with_switch/run_webpage.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
# Import required modules from omagent_core
from omagent_core.utils.container import container # For dependency injection container
from omagent_core.engine.workflow.conductor_workflow import ConductorWorkflow # For workflow management
from omagent_core.engine.workflow.task.simple_task import simple_task # For defining workflow tasks
from omagent_core.utils.registry import registry # For registering components
from omagent_core.clients.devices.webpage.client import WebpageClient # For webpage client interface
from omagent_core.utils.logger import logging # For logging functionality
logging.init_logger("omagent", "omagent", level="INFO") # Initialize logger

# Set up path configuration
from pathlib import Path
CURRENT_PATH = Path(__file__).parents[0] # Get current directory path
# Import agent modules
registry.import_module(project_path=CURRENT_PATH.joinpath('agent'))

# Add parent directory to Python path for imports
import sys
import os
sys.path.append(os.path.abspath(CURRENT_PATH.joinpath('../../')))

# Import input interface from previous example
from examples.step1_simpleVQA.agent.input_interface.input_interface import InputInterface


# Load container configuration from YAML file
# This configures dependencies like Redis connections and API endpoints
container.register_stm("RedisSTM")
container.from_config(CURRENT_PATH.joinpath('container.yaml'))


# Initialize outfit recommendation workflow with unique name
# This workflow will handle the outfit recommendation process
workflow = ConductorWorkflow(name='step2_outfit_with_switch')

# Configure workflow tasks:
# 1. Input interface task to get user's clothing request
task1 = simple_task(task_def_name='InputInterface', task_reference_name='input_task')
# 2. Weather decision task to determine if weather info is needed
task2 = simple_task(task_def_name='WeatherDecider', task_reference_name='weather_decider', inputs={'user_instruction': task1.output('user_instruction')})
# 3. Weather search task to fetch current weather conditions if needed
task3 = simple_task(task_def_name='WeatherSearcher', task_reference_name='weather_searcher', inputs={'user_instruction': task1.output('user_instruction')})
# 4. Outfit recommendation task to generate final clothing suggestions
task4 = simple_task(task_def_name='OutfitRecommendation', task_reference_name='outfit_recommendation')

# Configure workflow execution flow:
# The workflow follows this sequence:
# 1. Get user input
# 2. Analyze if weather info is needed
# 3. Conditionally fetch weather (only if decision task returns 0)
# 4. Generate outfit recommendations based on all gathered info
workflow >> task1 >> task2 >> {0 : task3} >> task4

# Register workflow
workflow.register(True)

# Initialize and start app client with workflow configuration
config_path = CURRENT_PATH.joinpath('configs')
agent_client = WebpageClient(interactor=workflow, config_path=config_path, workers=[InputInterface()])
agent_client.start_interactor()
65 changes: 65 additions & 0 deletions examples/step3_outfit_with_loop/run_webpage.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
# Import core OmAgent components for workflow management and app functionality
from omagent_core.utils.container import container
from omagent_core.engine.workflow.conductor_workflow import ConductorWorkflow
from omagent_core.engine.workflow.task.simple_task import simple_task
from omagent_core.clients.devices.webpage.client import WebpageClient
from omagent_core.utils.logger import logging
logging.init_logger("omagent", "omagent", level="INFO")

from omagent_core.utils.registry import registry

from pathlib import Path
CURRENT_PATH = Path(__file__).parents[0]

# Import and register worker modules from agent directory
registry.import_module(project_path=CURRENT_PATH.joinpath('agent'))

# Add parent directory to Python path for imports
import sys
import os
sys.path.append(os.path.abspath(CURRENT_PATH.joinpath('../../')))

# Import custom image input worker
from agent.outfit_image_input.outfit_image_input import OutfitImageInput
from examples.step2_outfit_with_switch.agent.outfit_recommendation.outfit_recommendation import OutfitRecommendation

# Import task type for implementing loops in workflow
from omagent_core.engine.workflow.task.do_while_task import DoWhileTask


# Configure container with Redis storage and load settings
container.register_stm("RedisSTM")
container.from_config(CURRENT_PATH.joinpath('container.yaml'))


# Initialize workflow for outfit recommendations with loops
workflow = ConductorWorkflow(name='step3_outfit_with_loop')

# Define workflow tasks:
# 1. Get initial outfit image input
task1 = simple_task(task_def_name='OutfitImageInput', task_reference_name='image_input')

# 2. Ask questions about the outfit
task2 = simple_task(task_def_name='OutfitQA', task_reference_name='outfit_qa')

# 3. Decide if enough information is gathered
task3 = simple_task(task_def_name='OutfitDecider', task_reference_name='outfit_decider')

# 4. Generate final outfit recommendations
task4 = simple_task(task_def_name='OutfitRecommendation', task_reference_name='outfit_recommendation')

# Create loop that continues Q&A until sufficient information is gathered
# Loop terminates when outfit_decider returns decision=true
outfit_qa_loop = DoWhileTask(task_ref_name='outfit_loop', tasks=[task2, task3],
termination_condition='if ($.outfit_decider["decision"] == true){false;} else {true;} ')

# Define workflow sequence: image input -> Q&A loop -> final recommendation
workflow >> task1 >> outfit_qa_loop >> task4

# Register workflow with conductor server
workflow.register(True)

# Initialize and start app client with workflow and image input worker
config_path = CURRENT_PATH.joinpath('configs')
agent_client = WebpageClient(interactor=workflow, config_path=config_path, workers=[OutfitImageInput()])
agent_client.start_interactor()
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
# Import required modules and components
from omagent_core.utils.container import container
from omagent_core.engine.workflow.conductor_workflow import ConductorWorkflow
from omagent_core.engine.workflow.task.simple_task import simple_task
from omagent_core.utils.logger import logging
logging.init_logger("omagent", "omagent", level="INFO")

# Set up paths and registry
from pathlib import Path
CURRENT_PATH = Path(__file__).parents[0]

from omagent_core.utils.registry import registry
registry.import_module(project_path=CURRENT_PATH.joinpath('agent'))

# Import app-specific components
from omagent_core.clients.devices.webpage.client import WebpageClient
from omagent_core.clients.devices.app.image_index import ImageIndexListener

# Configure container with storage systems
container.register_stm(stm='RedisSTM')
container.register_ltm(ltm='MilvusLTM')
container.from_config(CURRENT_PATH.joinpath('container.yaml'))

# Initialize image storage workflow
workflow = ConductorWorkflow(name='step4_image_storage')

# Configure workflow tasks:
# 1. Listen for new images
task1 = simple_task(task_def_name='ImageIndexListener', task_reference_name='image_index_listener')

# 2. Preprocess images for storage
task2 = simple_task(task_def_name='OutfitImagePreprocessor', task_reference_name='outfit_image_preprocessor', inputs={'image_data': task1.output('output')})

# Configure workflow execution flow: Listen -> Preprocess
workflow >> task1 >> task2

# Register workflow
workflow.register(True)

# Initialize and start app client with workflow configuration
config_path = CURRENT_PATH.joinpath('configs')
agent_client = WebpageClient(processor=workflow, config_path=config_path, workers=[ImageIndexListener()])
agent_client.start_processor()
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
from omagent_core.utils.container import container
from omagent_core.engine.workflow.conductor_workflow import ConductorWorkflow
from omagent_core.engine.workflow.task.simple_task import simple_task
from omagent_core.utils.logger import logging
logging.init_logger("omagent", "omagent", level="INFO")

from omagent_core.clients.devices.webpage.client import WebpageClient

from pathlib import Path
CURRENT_PATH = Path(__file__).parents[0]

from omagent_core.utils.registry import registry
registry.import_module(project_path=CURRENT_PATH.joinpath('agent'))

import sys
import os
sys.path.append(os.path.abspath(CURRENT_PATH.joinpath('../../../')))

from omagent_core.engine.workflow.task.do_while_task import DoWhileTask
from examples.step3_outfit_with_loop.agent.outfit_decider.outfit_decider import OutfitDecider
from examples.step3_outfit_with_loop.agent.outfit_qa.outfit_qa import OutfitQA

# Register storage components
container.register_stm(stm='RedisSTM')
container.register_ltm(ltm='MilvusLTM')

# Load container configuration
container.from_config(CURRENT_PATH.joinpath('container.yaml'))

# Create main workflow
workflow = ConductorWorkflow(name='step4_outfit_recommendation')

# Initialize QA task to gather user preferences
task1 = simple_task(task_def_name='OutfitQA', task_reference_name='outfit_qa')

# Initialize decision task to evaluate QA responses
task2 = simple_task(task_def_name='OutfitDecider', task_reference_name='outfit_decider')

# Initialize outfit generation task
task3 = simple_task(task_def_name='OutfitGeneration', task_reference_name='outfit_generation')

# Initialize conclusion task with outfit generation output
task4 = simple_task(task_def_name='OutfitConclusion', task_reference_name='outfit_conclusion', inputs={'proposed_outfit': task3.output('proposed_outfit')})

# Create iterative QA loop that continues until a positive decision is reached
outfit_qa_loop = DoWhileTask(task_ref_name='outfit_loop', tasks=[task1, task2],
termination_condition='if ($.outfit_decider["decision"] == true){false;} else {true;} ')

# Define workflow sequence
workflow >> outfit_qa_loop >> task3 >> task4

# Register workflow with conductor
workflow.register(True)

# Initialize and run CLI client
config_path = CURRENT_PATH.joinpath('configs')
cli_client = WebpageClient(interactor=workflow, config_path=config_path)
cli_client.start_interactor()
3 changes: 2 additions & 1 deletion omagent-core/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,8 @@ dependencies = [
"json_repair >= 0.30.1",
"scenedetect >= 0.6.4",
"pydub >= 0.25.1",
"face_recognition >= 1.3.0"
"face_recognition >= 1.3.0",
"gradio >= 5.7.1"
]

[project.urls]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ def send_to_group(self, stream_name, group_name, data):
stream_name, group_name, id="0"
)
except Exception as e:
logging.info(f"Consumer group may already exist: {e}")
logging.debug(f"Consumer group may already exist: {e}")

def send_base_message(
self,
Expand Down
Loading

0 comments on commit cd2fe79

Please sign in to comment.