Skip to content

Commit

Permalink
Add support for streaming output
Browse files Browse the repository at this point in the history
Add support for streaming output
  • Loading branch information
djwu563 committed Dec 10, 2024
1 parent 0a83dc8 commit 400d160
Show file tree
Hide file tree
Showing 2 changed files with 40 additions and 4 deletions.
22 changes: 19 additions & 3 deletions omagent-core/src/omagent_core/clients/devices/cli/callback.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import datetime
import os
import sys

from colorama import Fore, Style
from omagent_core.utils.registry import registry
Expand All @@ -10,6 +11,7 @@
@registry.register_component()
class DefaultCallback(CallbackBase):
bot_id: str = ""
incomplete_flag: bool = False

def visualize_in_terminal(self, *args, **kwargs):
pass
Expand All @@ -19,6 +21,11 @@ def info(self, agent_id, progress, message):
f"\n{Fore.BLUE}info:{agent_id} {progress} {message}{Style.RESET_ALL}"
)

def send_incomplete(self, agent_id, msg, **kwargs):
sys.stdout.write(f"{Fore.BLUE}{msg}{Style.RESET_ALL}")
sys.stdout.flush()
self.incomplete_flag = True

def send_block(
self,
agent_id,
Expand All @@ -27,16 +34,25 @@ def send_block(
):
if kwargs.get('filter_special_symbols', False):
msg = self.filter_special_symbols_in_msg(msg)
logging.info(f"\n{Fore.BLUE}block:{msg}{Style.RESET_ALL}")
if self.incomplete_flag:
sys.stdout.write(f"{Fore.BLUE}{msg}{Style.RESET_ALL}")
sys.stdout.flush()
self.incomplete_flag = False
else:
logging.info(f"\n{Fore.BLUE}block:{msg}{Style.RESET_ALL}")


def error(self, agent_id, error_code, error_info, **kwargs):
logging.error(f"\n{Fore.RED}{error_info}{Style.RESET_ALL}")

def send_answer(self, agent_id, msg, **kwargs):
if kwargs.get('filter_special_symbols', False):
msg = self.filter_special_symbols_in_msg(msg)
logging.info(f"\n{Fore.BLUE}answer:{msg}{Style.RESET_ALL}")
if self.incomplete_flag:
sys.stdout.write(f"{Fore.BLUE}{msg}{Style.RESET_ALL}")
sys.stdout.flush()
self.incomplete_flag = False
else:
logging.info(f"\n{Fore.BLUE}answer:{msg}{Style.RESET_ALL}")

def finish(self, **kwargs):
def generate_tree(path, indent=""):
Expand Down
22 changes: 21 additions & 1 deletion omagent-core/src/omagent_core/clients/devices/webpage/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ def __init__(
self._config_path = config_path
self._workers = workers
self._workflow_instance_id = None
self._incomplete_message = ""
self._custom_css = """
#OmAgent {
height: 100vh !important;
Expand Down Expand Up @@ -193,16 +194,35 @@ def bot(self, history: list):
# read output stream
messages = self._get_redis_stream_message(group_name, consumer_name, stream_name)
finish_flag = False

for stream, message_list in messages:
for message_id, message in message_list:
incomplete_flag = False
payload_data = self._get_message_payload(message)
if payload_data is None:
continue
if payload_data["content_status"] == ContentStatus.INCOMPLETE.value:
incomplete_flag = True
message_item = payload_data["message"]
if message_item["type"] == MessageType.IMAGE_URL.value:
history.append({"role": "assistant", "content": {"path": message_item["content"]}})
else:
history.append({"role": "assistant", "content": message_item["content"]})
if incomplete_flag:
self._incomplete_message = self._incomplete_message + message_item["content"]
if history and history[-1]["role"] == "assistant":
history[-1]["content"] = self._incomplete_message
else:
history.append({"role": "assistant", "content": self._incomplete_message})
else:
if self._incomplete_message != "":
self._incomplete_message = self._incomplete_message + message_item["content"]
if history and history[-1]["role"] == "assistant":
history[-1]["content"] = self._incomplete_message
else:
history.append({"role": "assistant", "content": self._incomplete_message})
self._incomplete_message = ""
else:
history.append({"role": "assistant", "content": message_item["content"]})

yield history

Expand Down

0 comments on commit 400d160

Please sign in to comment.