Skip to content

Commit

Permalink
Merge branch '0.2' into notebook-optional-user-input
Browse files Browse the repository at this point in the history
  • Loading branch information
rysweet authored Nov 21, 2024
2 parents d102543 + 2d52bbc commit 75b33cb
Show file tree
Hide file tree
Showing 45 changed files with 422 additions and 267 deletions.
9 changes: 8 additions & 1 deletion autogen/agentchat/contrib/capabilities/transforms.py
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,9 @@ def apply_transform(self, messages: List[Dict]) -> List[Dict]:
if remaining_count == 0:
break

if not transforms_util.is_tool_call_valid(truncated_messages):
truncated_messages.pop()

return truncated_messages

def get_logs(self, pre_transform_messages: List[Dict], post_transform_messages: List[Dict]) -> Tuple[str, bool]:
Expand Down Expand Up @@ -229,6 +232,9 @@ def apply_transform(self, messages: List[Dict]) -> List[Dict]:
processed_messages_tokens += msg_tokens
processed_messages.insert(0, msg)

if not transforms_util.is_tool_call_valid(processed_messages):
processed_messages.pop()

return processed_messages

def get_logs(self, pre_transform_messages: List[Dict], post_transform_messages: List[Dict]) -> Tuple[str, bool]:
Expand Down Expand Up @@ -319,7 +325,7 @@ def __init__(
text_compressor: Optional[TextCompressor] = None,
min_tokens: Optional[int] = None,
compression_params: Dict = dict(),
cache: Optional[AbstractCache] = Cache.disk(),
cache: Optional[AbstractCache] = None,
filter_dict: Optional[Dict] = None,
exclude_filter: bool = True,
):
Expand Down Expand Up @@ -391,6 +397,7 @@ def apply_transform(self, messages: List[Dict]) -> List[Dict]:

cache_key = transforms_util.cache_key(message["content"], self._min_tokens)
cached_content = transforms_util.cache_content_get(self._cache, cache_key)

if cached_content is not None:
message["content"], savings = cached_content
else:
Expand Down
4 changes: 4 additions & 0 deletions autogen/agentchat/contrib/capabilities/transforms_util.py
Original file line number Diff line number Diff line change
Expand Up @@ -112,3 +112,7 @@ def should_transform_message(message: Dict[str, Any], filter_dict: Optional[Dict
return True

return len(filter_config([message], filter_dict, exclude)) > 0


def is_tool_call_valid(messages: List[Dict[str, Any]]) -> bool:
return messages[0].get("role") != "tool"
4 changes: 4 additions & 0 deletions autogen/agentchat/groupchat.py
Original file line number Diff line number Diff line change
Expand Up @@ -1264,6 +1264,10 @@ async def a_run_chat(
else:
# admin agent is not found in the participants
raise
except NoEligibleSpeaker:
# No eligible speaker, terminate the conversation
break

if reply is None:
break
# The speaker sends the message without requesting a reply
Expand Down
2 changes: 1 addition & 1 deletion autogen/oai/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -1069,7 +1069,7 @@ def extract_text_or_completion_object(

def _throttle_api_calls(self, idx: int) -> None:
"""Rate limit api calls."""
if self._rate_limiters[idx]:
if idx < len(self._rate_limiters) and self._rate_limiters[idx]:
limiter = self._rate_limiters[idx]

assert limiter is not None
Expand Down
2 changes: 1 addition & 1 deletion autogen/version.py
Original file line number Diff line number Diff line change
@@ -1 +1 @@
__version__ = "0.2.36"
__version__ = "0.2.38"
1 change: 1 addition & 0 deletions samples/apps/cap/py/autogencap/Config.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,3 +5,4 @@
xsub_url: str = "tcp://127.0.0.1:5556"
router_url: str = "tcp://127.0.0.1:5557"
dealer_url: str = "tcp://127.0.0.1:5558"
USE_COLOR_LOGGING = True
Original file line number Diff line number Diff line change
@@ -1,57 +1,38 @@
import threading
import traceback

import zmq
from .actor_runtime import IMessageReceiver, IMsgActor, IRuntime
from .debug_log import Debug, Info

from .Config import xpub_url
from .DebugLog import Debug, Error, Info


class Actor:
class Actor(IMsgActor):
def __init__(self, agent_name: str, description: str, start_thread: bool = True):
"""Initialize the Actor with a name, description, and threading option."""
self.actor_name: str = agent_name
self.agent_description: str = description
self.run = False
self._start_event = threading.Event()
self._start_thread = start_thread
self._msg_receiver: IMessageReceiver = None
self._runtime: IRuntime = None

def on_connect(self, network):
Debug(self.actor_name, f"is connecting to {network}")
def on_connect(self):
"""Connect the actor to the runtime."""
Debug(self.actor_name, f"is connecting to {self._runtime}")
Debug(self.actor_name, "connected")

def on_txt_msg(self, msg: str, msg_type: str, receiver: str, sender: str) -> bool:
"""Handle incoming text messages."""
Info(self.actor_name, f"InBox: {msg}")
return True

def on_bin_msg(self, msg: bytes, msg_type: str, receiver: str, sender: str) -> bool:
"""Handle incoming binary messages."""
Info(self.actor_name, f"Msg: receiver=[{receiver}], msg_type=[{msg_type}]")
return True

def _msg_loop_init(self):
Debug(self.actor_name, "recv thread started")
self._socket: zmq.Socket = self._context.socket(zmq.SUB)
self._socket.setsockopt(zmq.RCVTIMEO, 500)
self._socket.connect(xpub_url)
str_topic = f"{self.actor_name}"
Debug(self.actor_name, f"subscribe to: {str_topic}")
self._socket.setsockopt_string(zmq.SUBSCRIBE, f"{str_topic}")
self._start_event.set()

def get_message(self):
try:
topic, msg_type, sender_topic, msg = self._socket.recv_multipart()
topic = topic.decode("utf-8") # Convert bytes to string
msg_type = msg_type.decode("utf-8") # Convert bytes to string
sender_topic = sender_topic.decode("utf-8") # Convert bytes to string
except zmq.Again:
return None # No message received, continue to next iteration
except Exception as e:
Error(self.actor_name, f"recv thread encountered an error: {e}")
traceback.print_exc()
return None
return topic, msg_type, sender_topic, msg

def dispatch_message(self, message):
"""Dispatch the received message based on its type."""
if message is None:
return
topic, msg_type, sender_topic, msg = message
Expand All @@ -65,40 +46,50 @@ def dispatch_message(self, message):
if not self.on_bin_msg(msg, msg_type, topic, sender_topic):
self.run = False

def get_message(self):
"""Retrieve a message from the runtime implementation."""
return self._msg_receiver.get_message()

def _msg_loop(self):
"""Main message loop for receiving and dispatching messages."""
try:
self._msg_loop_init()
self._msg_receiver = self._runtime.get_new_msg_receiver()
self._msg_receiver.init(self.actor_name)
self._start_event.set()
while self.run:
message = self.get_message()
message = self._msg_receiver.get_message()
self.dispatch_message(message)
except Exception as e:
Debug(self.actor_name, f"recv thread encountered an error: {e}")
traceback.print_exc()
finally:
self.run = False
# In case there was an exception at startup signal
# the main thread.
self._start_event.set()
self.run = False
Debug(self.actor_name, "recv thread ended")

def on_start(self, context: zmq.Context):
self._context = context
self.run: bool = True
def on_start(self, runtime: IRuntime):
"""Start the actor and its message receiving thread if applicable."""
self._runtime = runtime # Save the runtime
self.run = True
if self._start_thread:
self._thread = threading.Thread(target=self._msg_loop)
self._thread.start()
self._start_event.wait()
else:
self._msg_loop_init()
self._msg_receiver = self._runtime.get_new_msg_receiver()
self._msg_receiver.init(self.actor_name)

def disconnect_network(self, network):
"""Disconnect the actor from the network."""
Debug(self.actor_name, f"is disconnecting from {network}")
Debug(self.actor_name, "disconnected")
self.stop()

def stop(self):
"""Stop the actor and its message receiver."""
self.run = False
if self._start_thread:
self._thread.join()
self._socket.setsockopt(zmq.LINGER, 0)
self._socket.close()
self._msg_receiver.stop()
83 changes: 83 additions & 0 deletions samples/apps/cap/py/autogencap/actor_connector.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
from abc import ABC, abstractmethod
from typing import Any, Optional, Tuple


class IActorConnector(ABC):
"""
Abstract base class for actor connectors. Each runtime will have a different implementation.
Obtain an instance of the correct connector from the runtime by calling the runtime's find_by_xyz
method.
"""

@abstractmethod
def send_txt_msg(self, msg: str) -> None:
"""
Send a text message to the actor.
Args:
msg (str): The text message to send.
"""
pass

@abstractmethod
def send_bin_msg(self, msg_type: str, msg: bytes) -> None:
"""
Send a binary message to the actor.
Args:
msg_type (str): The type of the binary message.
msg (bytes): The binary message to send.
"""
pass

@abstractmethod
def send_proto_msg(self, msg: Any) -> None:
"""
Send a protocol buffer message to the actor.
Args:
msg (Any): The protocol buffer message to send.
"""
pass

@abstractmethod
def send_recv_proto_msg(
self, msg: Any, num_attempts: int = 5
) -> Tuple[Optional[str], Optional[str], Optional[bytes]]:
"""
Send a protocol buffer message and receive a response from the actor.
Args:
msg (Any): The protocol buffer message to send.
num_attempts (int, optional): Number of attempts to send and receive. Defaults to 5.
Returns:
Tuple[Optional[str], Optional[str], Optional[bytes]]: A tuple containing the topic,
message type, and response message, or None if no response is received.
"""
pass

@abstractmethod
def send_recv_msg(
self, msg_type: str, msg: bytes, num_attempts: int = 5
) -> Tuple[Optional[str], Optional[str], Optional[bytes]]:
"""
Send a binary message and receive a response from the actor.
Args:
msg_type (str): The type of the binary message.
msg (bytes): The binary message to send.
num_attempts (int, optional): Number of attempts to send and receive. Defaults to 5.
Returns:
Tuple[Optional[str], Optional[str], Optional[bytes]]: A tuple containing the topic,
message type, and response message, or None if no response is received.
"""
pass

@abstractmethod
def close(self) -> None:
"""
Close the actor connector and release any resources.
"""
pass
Loading

0 comments on commit 75b33cb

Please sign in to comment.