Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Multiplatform communication agent. #85

Open
wants to merge 3 commits into
base: main
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
311 changes: 245 additions & 66 deletions UtilityAgents/MultiplatformCommunicationCoordiator/coordinator/agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,123 +39,302 @@

import logging
import sys
from collections import defaultdict
from typing import Dict, Tuple, List, Any

import gevent
from volttron.platform.agent import utils

from volttron.platform.vip.agent import Agent, Core, RPC
from volttron.platform.jsonrpc import RemoteError


utils.setup_logging()
_log = logging.getLogger(__name__)
__version__ = '1.0'
__version__ = '2.0'


class MultiplatformCoordinator(Agent):
"""
Multiplatform Coordinator
"""

def __init__(self, config_path, **kwargs):
def __init__(self, config_path: str, **kwargs):
super().__init__(**kwargs)
self.config = utils.load_config(config_path) if config_path else {}
self.vip.config.set_default("config", self.config)
self.configured_platforms = self.config.get("connected_platforms")
self.routing_table = {}
self.register_subscriptions = {}
self.subscription_registry = defaultdict(lambda: defaultdict(dict))
self.vip.config.subscribe(self.configure_main, actions=['NEW', 'UPDATE'], pattern='config')
self.error = False

def check_routing(self, platform, identity):
def check_routing(self, platform: str, identity: str) -> bool:
"""
Check if platform and identity are in the routing table.
:param platform: external platform
:type platform: str
:param identity: vip-identity of agent
:type identity: str
:return:

Checks if the given platform and identity exist in the routing table.

Args:
platform (str): The platform to check in the routing table.
identity (str): The identity to check for within the platform's entry in the routing table.

Returns:
bool: True if both the platform and identity are found in the routing table, False otherwise.

"""
if platform in self.routing_table and identity in self.routing_table[platform]:
return True
else:
return False

def configure_main(self, config_name, action, contents):
def update_routing_table(self, platform: str):
"""
Configure main setup routing table.
:param config_name:
:param action:
:param contents:
:return:
Update the routing table with the list of agents found on the specified platform

Args:
platform (str): The platform to query for the list of agents

This method queries control for a list of agents on the specified platform and
updates the local routing table with this information. If the query fails due to a timeout or
remote error, the routing table entry for the platform is set to an empty list.

Raises:
gevent.Timeout: If the RPC call times out
RemoteError: If there is an error in the remote call
"""
try:
agent_list = self.vip.rpc.call('control', 'peerlist', external_platform=platform).get(timeout=10)
_log.debug(f'Update routing table for {platform}: {agent_list}')
self.routing_table[platform] = agent_list
except (gevent.Timeout, RemoteError) as ex:
_log.debug(f'Exception on connection to {platform} -- {ex}')
self.routing_table[platform] = []

def create_routing_table(self):
"""
Creates and initializes the routing table for the configured platforms.

This method sets up an empty routing table and iterates over the list
of configured platforms to update the routing information for each platform.

The routing table is a dictionary where routing information such as
routes and destinations will be stored. The update_routing_table method
is called for each platform in the list of configured platforms, populating
the routing table with the necessary data.

Raises:
KeyError: If a required key is missing in the platform configuration.
ValueError: If there is an invalid value in the platform configuration.
"""
self.config = contents
self.configured_platforms = self.config.get("connected_platforms")
self.routing_table = {}
for platform in self.configured_platforms:
try:
agent_list = self.vip.rpc.call('control', 'peerlist', external_platform=platform).get(timeout=10)
self.routing_table[platform] = agent_list
except (gevent.Timeout, RemoteError) as ex:
_log.debug(f'Exception connection to {platform} -- {ex}')
self.routing_table[platform] = []
self.update_routing_table(platform)

def configure_main(self, config_name: str, action: str, contents: Dict[str, Any]):
"""
Configures the main settings for the platform.

Args:
config_name (str): The name of the configuration to apply.
action (str): The action to perform with the configuration.
contents (dict): The configuration settings to apply.

Sets:
self.config: The configuration contents.
self.configured_platforms: List of connected platforms from the configuration.

Calls:
create_routing_table: Method to create the routing table based on the configuration.
"""
self.config = contents
self.configured_platforms = self.config.get("connected_platforms")
self.create_routing_table()

@RPC.export
def relay(self, platform, identity, function, *args, **kwargs) -> any:
def relay(self, platform: str, identity: str, function: str, *args, **kwargs) -> Any:
"""
Relay rpc call from one remote to another.
:param platform: external platform
:param identity: vip-identity of agent
:param function: rpc function
:param args: rpc args
:param kwargs: rpc kwargs
:return: result from rpc call
:rtype: any
Export method to relay messages between platforms.

The relay method allows communication between different platforms by routing
a given function call along with its arguments to the specified platform and
identity. If the routing check succeeds, the function attempts a remote
procedure call (RPC) with a specified timeout. Potential exceptions are handled
and logged.

Args:
platform (str): The target platform to which the message is to be relayed.
identity (str): The identity of the recipient on the target platform.
function (str): The name of the function to be called remotely.
*args: Variable length argument list to pass to the remote function.
**kwargs: Arbitrary keyword arguments to pass to the remote function.

Returns:
Any: The result from the remote procedure call, if successful; otherwise, None.
"""
_log.debug(f'Relaying message: {platform} - identity: {identity}')
result = None
if self.check_routing(platform, identity):
try:
result = self.vip.rpc.call(identity, function, *args, **kwargs, external_platform=platform).get(timeout=10)
result = self.vip.rpc.call(identity, function, *args, **kwargs,
external_platform=platform).get(timeout=10)
except (gevent.Timeout, RemoteError) as ex:
_log.debug(f'Exception connection to {platform} - identity: {identity} -- function: {function} -- {ex}')
self.update_routing_table(platform)
return result

@RPC.export
def register_subscription(self, data) -> bool:
def register_subscription(self, data: Dict[str, str]) -> bool:
"""
Register a subscription for a given topic for a remote on a different remote.
:param data: dict {'platform': str, 'topic': str, 'identity': str, 'function': str}
:return: True if successful else False
:rtype: bool
Registers a subscription for the given topic with the pub/sub service.

Args:
data (dict): A dictionary containing subscription details:
- 'topic' (str): The topic to subscribe to.
- 'all_platforms' (bool, optional): Whether to subscribe across all platforms. Defaults to False.

Returns:
bool: True if subscription was successful, False otherwise.

Raises:
gevent.Timeout: If the subscription attempt times out.
RemoteError: If there is an error from the remote service.
"""
_log.debug(f'Registering subscription: {data}')
try:
topic = data.pop('topic')
self.vip.pubsub.subscribe(peer='pubsub', prefix=topic, callback=self.subscription_handler, all_platforms=True).get(timeout=10)
self.register_subscriptions[topic] = data
topic = data['topic']
all_platforms = data.get('all_platforms', False)
self.vip.pubsub.subscribe(peer='pubsub', prefix=topic, callback=self.subscription_handler,
all_platforms=all_platforms).get(timeout=10)
self.build_subscription_map(data)
return True
except (gevent.Timeout, RemoteError) as ex:
_log.error(f'Failed to set configurations: {ex}', exc_info=True)
_log.debug(f'Failed to set configurations: {ex}', exc_info=True)
return False

def subscription_handler(self, peer, sender, bus, topic, headers, message) -> None:
"""
Handle subscriptions from remotes platforms.
:return: None
:rtype: None
"""
_log.debug(f'Received message from {peer} on {topic}: {message}')
if topic in self.register_subscriptions:
data = self.register_subscriptions[topic]
identity = data.get('identity', 'unknown')
platform = data.get('platform', 'unknown')
function = data.get('function', 'unknown')
# fuctions = self.vip.rpc.call(identity, 'inspect', external_platform=platform).get(timeout=10)
if self.check_routing(platform, identity):
try:
self.vip.rpc.call(identity, function, message, external_platform=platform).get(timeout=10)
except (gevent.Timeout, RemoteError) as ex:
_log.error(f'Failed to call {function} on {identity} on {platform}: {ex}', exc_info=True)
def build_subscription_map(self, data: Dict[str, str]):
"""
Builds a subscription map from the provided data.

Unpacks the subscription payload and updates the routing table and register subscriptions.

Args:
data: The subscription data to process. Expected to contain topic, identity, platform, and callback information.

Returns:
None
"""
topic, platform, identity, callback = self.unpack_subscription_payload(data)
self.update_routing_table(platform)
_log.debug(f'Updating routing table: {self.routing_table}')
self.subscription_registry[topic][platform][identity] = callback
self.update_routing_table(platform)
self.unregister_subscription()

def unregister_subscription(self):
"""
Cleans up invalid subscriptions and empty topics from the current subscription list.

The function performs the following actions:
- Calls the `cleanup_invalid_subscriptions` method to remove any invalid subscriptions.
- Calls the `cleanup_empty_topics` method to remove any empty topics.
"""
_log.debug(f'Running unsubscribe methods!')
self.cleanup_invalid_subscriptions()
self.cleanup_empty_topics()

def cleanup_invalid_subscriptions(self):
"""
Removes invalid subscriptions from the subscription_registry dictionary.

Identifies invalid subscriptions by checking if routing exists for each identity in the subscription.
If routing does not exist, the identity is marked for cleanup.

This function iterates through each topic, platform, and identity in the subscription_registry dictionary.
If an identity is found to be invalid, it is added to a temporary cleanup dictionary.
After identifying all invalid subscriptions, it removes them from the original subscription_registry dictionary.
"""
cleanup = defaultdict(lambda: defaultdict(list))

for topic, topic_payload in self.subscription_registry.items():
for platform, identities in topic_payload.items():
for identity in identities:
if not self.check_routing(platform, identity):
cleanup[topic][platform].append(identity)
_log.debug(f'Running cleanup_invalid_subscriptions {cleanup}')
for topic, platforms in cleanup.items():
for platform, identities in platforms.items():
for identity in identities:
self.subscription_registry[topic][platform].pop(identity, None)
_log.debug(f'Execute cleanup_invalid_subscriptions {self.subscription_registry}')

def cleanup_empty_topics(self):
"""
Cleans up empty topics from the subscription registry and unsubscribes them from the pubsub system.

This method iterates over the registered subscriptions to identify topics with no associated payloads.
For each identified empty topic, it removes the topic from the subscription registry and unsubscribes from the pubsub system.

"""
empty_topics = []
all_topics = list(self.subscription_registry.keys())
for topic in all_topics:
if all(not payload for payload in self.subscription_registry[topic].values()):
empty_topics.append(topic)
_log.debug(f'Running cleanup_empty_topics {empty_topics}')
for topic in empty_topics:
self.subscription_registry.pop(topic, None)
self.vip.pubsub.unsubscribe(peer='pubsub',
prefix=topic,
callback=self.subscription_handler).get(timeout=10)

@staticmethod
def unpack_subscription_payload(data: Dict[str, str]) -> Tuple[str, str, str, str]:
"""
Unpacks subscription payload from a dictionary and returns individual components.

Args:
data (dict[str, str]): The dictionary containing subscription information.

Returns:
tuple[str, str, str, str]: A tuple containing the topic, platform, identity, and callback function name.
"""
topic = data['topic']
identity = data['identity']
platform = data['platform']
callback = data['function']
return topic, platform, identity, callback

def subscription_handler(self, peer: str, sender: str, bus: str,
topic: str, headers: str, message: Any) -> None:
"""
Handles incoming subscriptions by forwarding messages to appropriate subscribed platforms and identities.

Args:
peer (str): The peer from which the message is received.
sender (str): The sender of the message.
bus (str): The bus on which the message is received.
topic (str): The topic of the message.
headers (dict): Headers associated with the message.
message (Any): The message payload.

Logs:
Logs debug information about received message and callback routing.

Exceptions:
Handles gevent.Timeout and RemoteError exceptions during RPC call and logs errors appropriately.

"""
_log.debug(f'Received message from {peer} on {topic}')
on_error = False
subscriptions = self.subscription_registry.get(topic, {})
for platform, platform_payload in subscriptions.items():
for identity, callback in platform_payload.items():
_log.debug(f'Sending to {platform} -- {identity} with callback {callback}')
if self.check_routing(platform, identity):
try:
self.vip.rpc.call(identity, callback, headers, message,
external_platform=platform).get(timeout=10)
except (gevent.Timeout, RemoteError) as ex:
on_error = True
self.update_routing_table(platform)
_log.error(f'Failed to call {callback} for {identity} on {platform}: {ex}')
if on_error:
self.unregister_subscription()


def main(argv=sys.argv):
Expand Down