From 400d16018aff09239ba76bd878acf201764b1e37 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E6=BD=87=E9=99=B5?= <347090563@qq.com> Date: Tue, 10 Dec 2024 16:07:42 +0800 Subject: [PATCH] Add support for streaming output Add support for streaming output --- .../clients/devices/cli/callback.py | 22 ++++++++++++++++--- .../clients/devices/webpage/client.py | 22 ++++++++++++++++++- 2 files changed, 40 insertions(+), 4 deletions(-) diff --git a/omagent-core/src/omagent_core/clients/devices/cli/callback.py b/omagent-core/src/omagent_core/clients/devices/cli/callback.py index 1f0c626d..f7dc8136 100644 --- a/omagent-core/src/omagent_core/clients/devices/cli/callback.py +++ b/omagent-core/src/omagent_core/clients/devices/cli/callback.py @@ -1,5 +1,6 @@ import datetime import os +import sys from colorama import Fore, Style from omagent_core.utils.registry import registry @@ -10,6 +11,7 @@ @registry.register_component() class DefaultCallback(CallbackBase): bot_id: str = "" + incomplete_flag: bool = False def visualize_in_terminal(self, *args, **kwargs): pass @@ -19,6 +21,11 @@ def info(self, agent_id, progress, message): f"\n{Fore.BLUE}info:{agent_id} {progress} {message}{Style.RESET_ALL}" ) + def send_incomplete(self, agent_id, msg, **kwargs): + sys.stdout.write(f"{Fore.BLUE}{msg}{Style.RESET_ALL}") + sys.stdout.flush() + self.incomplete_flag = True + def send_block( self, agent_id, @@ -27,16 +34,25 @@ def send_block( ): if kwargs.get('filter_special_symbols', False): msg = self.filter_special_symbols_in_msg(msg) - logging.info(f"\n{Fore.BLUE}block:{msg}{Style.RESET_ALL}") + if self.incomplete_flag: + sys.stdout.write(f"{Fore.BLUE}{msg}{Style.RESET_ALL}") + sys.stdout.flush() + self.incomplete_flag = False + else: + logging.info(f"\n{Fore.BLUE}block:{msg}{Style.RESET_ALL}") - def error(self, agent_id, error_code, error_info, **kwargs): logging.error(f"\n{Fore.RED}{error_info}{Style.RESET_ALL}") def send_answer(self, agent_id, msg, **kwargs): if kwargs.get('filter_special_symbols', False): msg = self.filter_special_symbols_in_msg(msg) - logging.info(f"\n{Fore.BLUE}answer:{msg}{Style.RESET_ALL}") + if self.incomplete_flag: + sys.stdout.write(f"{Fore.BLUE}{msg}{Style.RESET_ALL}") + sys.stdout.flush() + self.incomplete_flag = False + else: + logging.info(f"\n{Fore.BLUE}answer:{msg}{Style.RESET_ALL}") def finish(self, **kwargs): def generate_tree(path, indent=""): diff --git a/omagent-core/src/omagent_core/clients/devices/webpage/client.py b/omagent-core/src/omagent_core/clients/devices/webpage/client.py index 4c4bb497..f804709e 100644 --- a/omagent-core/src/omagent_core/clients/devices/webpage/client.py +++ b/omagent-core/src/omagent_core/clients/devices/webpage/client.py @@ -35,6 +35,7 @@ def __init__( self._config_path = config_path self._workers = workers self._workflow_instance_id = None + self._incomplete_message = "" self._custom_css = """ #OmAgent { height: 100vh !important; @@ -193,16 +194,35 @@ def bot(self, history: list): # read output stream messages = self._get_redis_stream_message(group_name, consumer_name, stream_name) finish_flag = False + for stream, message_list in messages: for message_id, message in message_list: + incomplete_flag = False payload_data = self._get_message_payload(message) if payload_data is None: continue + if payload_data["content_status"] == ContentStatus.INCOMPLETE.value: + incomplete_flag = True message_item = payload_data["message"] if message_item["type"] == MessageType.IMAGE_URL.value: history.append({"role": "assistant", "content": {"path": message_item["content"]}}) else: - history.append({"role": "assistant", "content": message_item["content"]}) + if incomplete_flag: + self._incomplete_message = self._incomplete_message + message_item["content"] + if history and history[-1]["role"] == "assistant": + history[-1]["content"] = self._incomplete_message + else: + history.append({"role": "assistant", "content": self._incomplete_message}) + else: + if self._incomplete_message != "": + self._incomplete_message = self._incomplete_message + message_item["content"] + if history and history[-1]["role"] == "assistant": + history[-1]["content"] = self._incomplete_message + else: + history.append({"role": "assistant", "content": self._incomplete_message}) + self._incomplete_message = "" + else: + history.append({"role": "assistant", "content": message_item["content"]}) yield history