diff --git a/README.md b/README.md index 17be876..ee54608 100644 --- a/README.md +++ b/README.md @@ -1,19 +1,18 @@ # volttron-lib-base-driver +[![Eclipse VOLTTRON™](https://img.shields.io/badge/Eclips%20VOLTTRON--red.svg)](https://volttron.readthedocs.io/en/latest/) ![Python 3.10](https://img.shields.io/badge/python-3.10-blue.svg) -![Python 3.11](https://img.shields.io/badge/python-3.11-blue.svg) ![Passing?](https://github.com/eclipse-volttron/volttron-lib-base-driver/actions/workflows/run-tests.yml/badge.svg) [![pypi version](https://img.shields.io/pypi/v/volttron-lib-base-driver.svg)](https://pypi.org/project/volttron-lib-base-driver/) -# Requires +## Requirements * python >= 3.10 -* volttron >= 10.0 - +* volttron-core >= 2.0.0rc0 # Documentation -More detailed documentation can be found on [ReadTheDocs](https://volttron.readthedocs.io/en/modular/). The RST source +More detailed information about the VOLTTRON Driver Framework can be found [ReadTheDocs](https://eclipse-volttron.readthedocs.io/en/latest/external-docs/volttron-platform-driver/index.html#platform-driver-framework). The RST source of the documentation for this component is located in the "docs" directory of this repository. ## Installation @@ -25,13 +24,7 @@ Information on how to install of the VOLTTRON platform can be found Install the library. You have two options. You can install this library using the version on PyPi: ```shell -pip install volttron-lib-base-driver -``` - -Or you can install the local version of this library from this repo: - -```shell -pip install -e . +poetry add --directory $VOLTTRON_HOME volttron-lib-base-driver ``` ## Development diff --git a/pyproject.toml b/pyproject.toml index 3a8def3..11e7b00 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -32,7 +32,7 @@ ignore_missing_imports = true [tool.poetry] name = "volttron-lib-base-driver" -version = "0.2.1rc0" +version = "2.0.0rc0" description = "Volttron Driver libraries used to support development within the Volttron Driver Framework." authors = ["Mark Bonicillo "] license = "Apache License 2.0" @@ -52,8 +52,7 @@ classifiers = [ [tool.poetry.dependencies] python = ">=3.10,<4.0" -volttron = ">=10.0.1a43,<11.0" - +volttron-core = ">=2.0.0rc0" [tool.poetry.group.dev.dependencies] pytest = "^6.2.5" diff --git a/src/volttron/driver/base/config.py b/src/volttron/driver/base/config.py new file mode 100644 index 0000000..eda15e4 --- /dev/null +++ b/src/volttron/driver/base/config.py @@ -0,0 +1,80 @@ +from datetime import timedelta +from enum import Enum +from pydantic import BaseModel, computed_field, ConfigDict, Field, field_validator + +# TODO: Wire up the data_source field to poll scheduling (everything is currently short-poll because this isn't used). +# TODO: Should NEVER actually be an option? Could it just be None? +class DataSource(Enum): + SHORT_POLL = "short_poll" + LONG_POLL = "long_poll" + NEVER = "never" + POLL_ONCE = "poll_once" + STATIC = "static" + + +class EquipmentConfig(BaseModel): + model_config = ConfigDict(validate_assignment=True, populate_by_name=True) + active: bool | None = None + group: str | None = None + # TODO: If this needs to be an int, we may need to use milliseconds someplace. + polling_interval: int | None = Field(default=None, alias='interval') + publish_single_depth: bool | None = Field(default=None, alias='publish_depth_first_single') + publish_single_breadth: bool | None = Field(default=None, alias='publish_breadth_first_single') + publish_multi_depth: bool | None = Field(default=None, alias='publish_depth_first_multi') + publish_multi_breadth: bool | None = Field(default=None, alias='publish_breadth_first_multi') + publish_all_depth: bool | None = Field(default=None, alias='publish_depth_first_all') + publish_all_breadth: bool | None = Field(default=None, alias='publish_breadth_first_all') + reservation_required_for_write: bool = False + + @field_validator('polling_interval', mode='before') + @classmethod + def _normalize_polling_interval(cls, v): + # TODO: This does not match int above, but we may need to convert to ms in calculations. + return None if v == '' or v is None else float(v) + +class PointConfig(EquipmentConfig): + data_source: DataSource = Field(default='short_poll', alias='Data Source') + notes: str = Field(default='', alias='Notes') + reference_point_name: str = Field(default='', alias='Reference Point Name') + stale_timeout_configured: float | None = Field(default=None, alias='stale_timeout') + stale_timeout_multiplier: float = Field(default=3) + units: str = Field(default='', alias='Units') + units_details: str = Field(default='', alias='Unit Details') + volttron_point_name: str = Field(alias='Volttron Point Name') + writable: bool = Field(default=False, alias='Writable') + + @field_validator('data_source', mode='before') + @classmethod + def _normalize_data_source(cls, v): + # TODO: This never converts to DataSource. + # TODO: Data Source enum needs something to tell Data Point how to serialize it, otherwise enable/disable will fail. + return v.lower() + + @computed_field + @property + def stale_timeout(self) -> timedelta | None: + if self.stale_timeout_configured is None and self.polling_interval is None: + return None + else: + return timedelta(seconds=(self.stale_timeout_configured + if self.stale_timeout_configured is not None + else self.polling_interval * self.stale_timeout_multiplier)) + + @stale_timeout.setter + def stale_timeout(self, value): + self.stale_timeout_configured = value + + +class DeviceConfig(EquipmentConfig): + all_publish_interval: float = 0.0 + allow_duplicate_remotes: bool = False + equipment_specific_fields: dict = {} + registry_config: list[PointConfig] = [] + + +class RemoteConfig(BaseModel): + model_config = ConfigDict(extra='allow', validate_assignment=True) + debug: bool = False + driver_type: str + heart_beat_point: str | None = None + module: str | None = None \ No newline at end of file diff --git a/src/volttron/driver/base/driver.py b/src/volttron/driver/base/driver.py index dcbb121..a9a44ef 100644 --- a/src/volttron/driver/base/driver.py +++ b/src/volttron/driver/base/driver.py @@ -22,208 +22,71 @@ # ===----------------------------------------------------------------------=== # }}} -import datetime -import importlib -import inspect +import gevent import logging -import random import traceback + from typing import cast +from weakref import WeakSet -import gevent -from volttron.client.messaging import headers as headers_mod -from volttron.client.messaging.topics import ( - DEVICES_PATH, - DEVICES_VALUE, - DRIVER_TOPIC_ALL, - DRIVER_TOPIC_BASE, -) -from volttron.client.vip.agent import BasicAgent, Core -from volttron.client.vip.agent.errors import Again, VIPError -from volttron.utils import ( - format_timestamp, - get_aware_utc_now, - get_class, - get_module, - get_subclasses, - setup_logging, -) -from volttron.driver.base.driver_locks import publish_lock +from volttron.client.vip.agent import Agent, BasicAgent, Core + from volttron.driver.base.interfaces import BaseInterface +from volttron.driver.base.config import PointConfig, RemoteConfig +from .utils import publication_headers, publish_wrapper -setup_logging() _log = logging.getLogger(__name__) -class DriverAgent(BasicAgent): - - def __init__(self, - parent, - config, - time_slot, - driver_scrape_interval, - device_path, - group, - group_offset_interval, - default_publish_depth_first_all=True, - default_publish_breadth_first_all=True, - default_publish_depth_first=True, - default_publish_breadth_first=True, - **kwargs): - super(DriverAgent, self).__init__(**kwargs) +class DriverAgent: + def __init__(self, config: RemoteConfig, core, equipment_model, scalability_test, tz: str, unique_id: any, + vip: Agent.Subsystems): + self.config: RemoteConfig = config + self.core = core + self.equipment_model = equipment_model # TODO: This should probably move out of the agent and into the base or a library. + self.scalability_test = scalability_test # TODO: If this is used from here, it should probably be in the base driver. + self.tz: str = tz # TODO: This won't update here if it is updated in the agent. Can it's use be moved out of here? + self.unique_id: any = unique_id + self.vip: Agent.Subsystems = vip + + # State Variables + self.equipment = WeakSet() self.heart_beat_value = 0 - self.device_name = '' - #Use the parent's vip connection - self.parent = parent - self.vip = parent.vip - self.config = config - self.device_path = device_path - - self.update_publish_types(default_publish_depth_first_all, - default_publish_breadth_first_all, default_publish_depth_first, - default_publish_breadth_first) - - try: - interval = int(config.get("interval", 60)) - if interval < 1.0: - raise ValueError - except ValueError: - _log.warning("Invalid device scrape interval {}. Defaulting to 60 seconds.".format( - config.get("interval"))) - interval = 60 - - self.interval = interval - self.periodic_read_event = None - - self.update_scrape_schedule(time_slot, driver_scrape_interval, group, - group_offset_interval) - - def update_publish_types(self, publish_depth_first_all, publish_breadth_first_all, - publish_depth_first, publish_breadth_first): - """Setup which publish types happen for a scrape. - Values passed in are overridden by settings in the specific device configuration.""" - self.publish_depth_first_all = bool( - self.config.get("publish_depth_first_all", publish_depth_first_all)) - self.publish_breadth_first_all = bool( - self.config.get("publish_breadth_first_all", publish_breadth_first_all)) - self.publish_depth_first = bool(self.config.get("publish_depth_first", - publish_depth_first)) - self.publish_breadth_first = bool( - self.config.get("publish_breadth_first", publish_breadth_first)) - - def update_scrape_schedule(self, time_slot, driver_scrape_interval, group, - group_offset_interval): - self.time_slot_offset = (time_slot * driver_scrape_interval) + (group * - group_offset_interval) - self.time_slot = time_slot - self.group = group - - _log.debug("{} group: {}, time_slot: {}, offset: {}".format(self.device_path, group, - time_slot, - self.time_slot_offset)) - - if self.time_slot_offset >= self.interval: - _log.warning( - "Scrape offset exceeds interval. Required adjustment will cause scrapes to double up with other devices." - ) - while self.time_slot_offset >= self.interval: - self.time_slot_offset -= self.interval - - #check weather or not we have run our starting method. - if not self.periodic_read_event: - return - - self.periodic_read_event.cancel() - - next_periodic_read = self.find_starting_datetime(get_aware_utc_now()) - - self.periodic_read_event = self.core.schedule(next_periodic_read, self.periodic_read, - next_periodic_read) - - def find_starting_datetime(self, now): - midnight = now.replace(hour=0, minute=0, second=0, microsecond=0) - seconds_from_midnight = (now - midnight).total_seconds() - interval = self.interval - - offset = seconds_from_midnight % interval - - if not offset: - return now - - previous_in_seconds = seconds_from_midnight - offset - next_in_seconds = previous_in_seconds + interval - - from_midnight = datetime.timedelta(seconds=next_in_seconds) - return midnight + from_midnight + datetime.timedelta(seconds=self.time_slot_offset) - - def get_interface(self, driver_type: str, driver_config: dict, - registry_config: list) -> BaseInterface: - """Returns an instance of the interface - - :param driver_type: The name of the driver - :type driver_type: str - :param driver_config: The configuration of the driver - :type driver_config: dict - :param registry_config: A list of registry points reprsented as dictionaries - :type registry_config: list - - :return: Returns an instance of a Driver that is a subclass of BaseInterface - :rtype: BaseInterface - - :raises ValueError: Raises ValueError if no subclasses are found. - """ - module = self.get_driver_module(driver_config, driver_type) - base_interface_class = get_class('volttron.driver.base.interfaces', 'BaseInterface') - subclasses = get_subclasses(module, base_interface_class) - - klass = subclasses[0] - _log.debug(f"Instantiating driver: {klass}") - interface = klass(vip=self.vip, core=self.core, device_path=self.device_path) - - _log.debug(f"Configuring driver with this configuration: {driver_config}") - interface.configure(driver_config, registry_config) - - return cast(BaseInterface, interface) - - def get_driver_module(self, driver_config, driver_type): - if driver_config.get("driver_module") is not None: - return get_module(driver_config.get("driver_module")) - return get_module(f"volttron.driver.interfaces.{driver_type}.{driver_type}") - - @Core.receiver('onstart') - def starting(self, sender, **kwargs): - self.setup_device() - - # interval = self.config.get("interval", 60) - # self.core.periodic(interval, self.periodic_read, wait=None) - - next_periodic_read = self.find_starting_datetime(get_aware_utc_now()) - - self.periodic_read_event = self.core.schedule(next_periodic_read, self.periodic_read, - next_periodic_read) - - self.all_path_depth, self.all_path_breadth = self.get_paths_for_point(DRIVER_TOPIC_ALL) - - def setup_device(self): - - config = self.config - driver_config = config["driver_config"] - driver_type = config["driver_type"] - registry_config = config.get("registry_config") - - self.heart_beat_point = config.get("heart_beat_point") + self.interface = None + self.publishers = {} try: - self.interface = self.get_interface(driver_type, driver_config, registry_config) + # TODO: What happens if we have multiple device nodes on this remote? + # Are we losing all settings but the first? + klass = BaseInterface.get_interface_subclass(self.config.driver_type) + interface = klass(self.config, self.core, self.vip) + self.interface = cast(BaseInterface, interface) except ValueError as e: _log.error(f"Failed to setup device: {e}") raise e - self.meta_data = {} + def add_registers(self, registry_config: list[PointConfig], base_topic: str): + """ + Configure a set of registers on this remote. - for point in self.interface.get_register_names(): - register = self.interface.get_register_by_name(point) + :param registry_config: A list of registry points represented as PointConfigs + :param base_topic: The portion of the topic shared by all points in this registry. + """ + for register_config in registry_config: + register = self.interface.create_register(register_config) + self.interface.insert_register(register, base_topic) + # TODO: Finalize method is only used by bacnet, currently, and that pauses 30s on each device if it can't connect. + # try: + # self.interface.finalize_setup(initial_setup=True) + # except BaseException as e: + # _log.warning(f'Exception occurred while finalizing setup of interface for {self.unique_id}: {e}.') + + for point_name in self.interface.get_register_names(): + register = self.interface.get_register_by_name(point_name) + point = self.equipment_model.get_node(point_name) + # TODO: It might be more reasonable to either have the register be aware of the type mappings or have a + # type-mapping function separately. This is rather limiting. What is "ts" anyway? TypeScript? if register.register_type == 'bit': ts_type = 'boolean' else: @@ -233,155 +96,88 @@ def setup_device(self): ts_type = 'float' elif register.python_type is str: ts_type = 'string' - - self.meta_data[point] = { + # TODO: Why is there not an else here? ts_type may be undefined. + # TODO: meta_data may belong in the PointNode object. This function could take points instead of their + # configs and pack the data into the PointNode instead of a separate dictionary in this class. + point.meta_data = { 'units': register.get_units(), 'type': ts_type, - 'tz': config.get('timezone', '') + 'tz': self.tz } - self.base_topic = DEVICES_VALUE(campus='', - building='', - unit='', - path=self.device_path, - point=None) - - self.device_name = DEVICES_PATH(base='', - node='', - campus='', - building='', - unit='', - path=self.device_path, - point='') - - # self.parent.device_startup_callback(self.device_name, self) - - def periodic_read(self, now): - #we not use self.core.schedule to prevent drift. - next_scrape_time = now + datetime.timedelta(seconds=self.interval) - # Sanity check now. - # This is specifically for when this is running in a VM that gets - # suspended and then resumed. - # If we don't make this check a resumed VM will publish one event - # per minute of - # time the VM was suspended for. - test_now = get_aware_utc_now() - if test_now - next_scrape_time > datetime.timedelta(seconds=self.interval): - next_scrape_time = self.find_starting_datetime(test_now) - - _log.debug("{} next scrape scheduled: {}".format(self.device_path, next_scrape_time)) - - self.periodic_read_event = self.core.schedule(next_scrape_time, self.periodic_read, - next_scrape_time) - - _log.debug("scraping device: " + self.device_name) - - self.parent.scrape_starting(self.device_name) - + def poll_data(self, current_points, publish_setup): + _log.debug(f'@@@@@ Polling: {self.unique_id}') + if self.scalability_test: # TODO: Update scalability testing. + self.scalability_test.poll_starting(self.unique_id) try: - results = self.interface.scrape_all() - register_names = self.interface.get_register_names_view() - for point in (register_names - results.keys()): - depth_first_topic = self.base_topic(point=point) - _log.error("Failed to scrape point: " + depth_first_topic) - except (Exception, gevent.Timeout) as ex: - tb = traceback.format_exc() - _log.error('Failed to scrape ' + self.device_name + ':\n' + tb) - return - - # XXX: Does a warning need to be printed? - if not results: - return - - utcnow = get_aware_utc_now() - utcnow_string = format_timestamp(utcnow) - sync_timestamp = format_timestamp(now - datetime.timedelta(seconds=self.time_slot_offset)) - - headers = { - headers_mod.DATE: utcnow_string, - headers_mod.TIMESTAMP: utcnow_string, - headers_mod.SYNC_TIMESTAMP: sync_timestamp - } - - if self.publish_depth_first or self.publish_breadth_first: - for point, value in results.items(): - depth_first_topic, breadth_first_topic = self.get_paths_for_point(point) - message = [value, self.meta_data[point]] - - if self.publish_depth_first: - self._publish_wrapper(depth_first_topic, headers=headers, message=message) - - if self.publish_breadth_first: - self._publish_wrapper(breadth_first_topic, headers=headers, message=message) - - message = [results, self.meta_data] - if self.publish_depth_first_all: - self._publish_wrapper(self.all_path_depth, headers=headers, message=message) - - if self.publish_breadth_first_all: - self._publish_wrapper(self.all_path_breadth, headers=headers, message=message) - - self.parent.scrape_ending(self.device_name) - - def _publish_wrapper(self, topic, headers, message): - while True: - try: - with publish_lock(): - _log.debug("publishing: " + topic) - self.vip.pubsub.publish('pubsub', topic, headers=headers, - message=message).get(timeout=10.0) - - _log.debug("finish publishing: " + topic) - except gevent.Timeout: - _log.warning("Did not receive confirmation of publish to " + topic) - break - except Again: - _log.warning("publish delayed: " + topic + " pubsub is busy") - gevent.sleep(random.random()) - except VIPError as ex: - _log.warning("driver failed to publish " + topic + ": " + str(ex)) - break - else: - break + results, errors = self.interface.get_multiple_points(current_points.keys()) + for failed_point, failure_message in errors.items(): + _log.warning(f'Failed to poll {failed_point}: {failure_message}') + if results: + for topic, value in results.items(): + try: + current_points[topic].last_value = value + except KeyError as e: + _log.warning(f'Failed to set last value: "{e}". Point may no longer be found in EquipmentTree.') + self.publish(results, publish_setup) + return True # TODO: There could really be better logic in the method to measure success. + except (Exception, gevent.Timeout) as e: + _log.error(f'Exception while polling {self.unique_id}: {e}') + if self.config.debug: + # TODO: Add an RPC to turn on debugging for individual remotes. Maybe for nodes as well? + tb = traceback.format_exc() + _log.error(tb) + return False + finally: + if self.scalability_test: + self.scalability_test.poll_ending(self.unique_id) + + # noinspection DuplicatedCode + def publish(self, results, publish_setup): + headers = publication_headers() + # TODO: Should probably wrap in try block(s). One for each loop, so it catches failure with single iterations? + for point_topic in publish_setup['single_depth']: + publish_wrapper(self.vip, point_topic, headers=headers, message=[ + results[point_topic], self.equipment_model.get_node(point_topic).meta_data + ]) + for point_topic, publish_topic in publish_setup['single_breadth']: + publish_wrapper(self.vip, publish_topic, headers=headers, message=[ + results[point_topic], self.equipment_model.get_node(point_topic).meta_data + ]) + for device_topic, points in publish_setup['multi_depth'].items(): + publish_wrapper(self.vip, f'{device_topic}/multi', headers=headers, message=[ + {point.rsplit('/', 1)[-1]: results[point] for point in points if point in results}, + {point.rsplit('/', 1)[-1]: self.equipment_model.get_node(point).meta_data for point in points} + ]) + for (device_topic, publish_topic), points in publish_setup['multi_breadth'].items(): + publish_wrapper(self.vip, f'{publish_topic}/multi', headers=headers, message=[ + {point.rsplit('/', 1)[-1]: results[point] for point in points if point in results}, + {point.rsplit('/', 1)[-1]: self.equipment_model.get_node(point).meta_data for point in points} + ]) def heart_beat(self): - if self.heart_beat_point is None: + if self.config.heart_beat_point is None: return - self.heart_beat_value = int(not bool(self.heart_beat_value)) + self.set_point(self.config.heart_beat_point, self.heart_beat_value) - _log.debug("sending heartbeat: " + self.device_name + ' ' + str(self.heart_beat_value)) + def get_point(self, topic, **kwargs): + return self.interface.get_point(topic, **kwargs) - self.set_point(self.heart_beat_point, self.heart_beat_value) - - def get_paths_for_point(self, point): - depth_first = self.base_topic(point=point) - - parts = depth_first.split('/') - breadth_first_parts = parts[1:] - breadth_first_parts.reverse() - breadth_first_parts = [DRIVER_TOPIC_BASE] + breadth_first_parts - breadth_first = '/'.join(breadth_first_parts) - - return depth_first, breadth_first - - def get_point(self, point_name, **kwargs): - return self.interface.get_point(point_name, **kwargs) - - def set_point(self, point_name, value, **kwargs): - return self.interface.set_point(point_name, value, **kwargs) + def set_point(self, topic, value, **kwargs): + return self.interface.set_point(topic, value, **kwargs) def scrape_all(self): return self.interface.scrape_all() - def get_multiple_points(self, point_names, **kwargs): - return self.interface.get_multiple_points(self.device_name, point_names, **kwargs) + def get_multiple_points(self, topics, **kwargs): + return self.interface.get_multiple_points(topics, **kwargs) - def set_multiple_points(self, point_names_values, **kwargs): - return self.interface.set_multiple_points(self.device_name, point_names_values, **kwargs) + def set_multiple_points(self, topics_values, **kwargs): + return self.interface.set_multiple_points(topics_values, **kwargs) - def revert_point(self, point_name, **kwargs): - self.interface.revert_point(point_name, **kwargs) + def revert_point(self, topic, **kwargs): + self.interface.revert_point(topic, **kwargs) def revert_all(self, **kwargs): self.interface.revert_all(**kwargs) @@ -392,32 +188,30 @@ def publish_cov_value(self, point_name, point_values): :param point_name: point which sent COV notifications :param point_values: COV point values """ - utcnow = get_aware_utc_now() - utcnow_string = format_timestamp(utcnow) - headers = { - headers_mod.DATE: utcnow_string, - headers_mod.TIMESTAMP: utcnow_string, - } - for point, value in point_values.items(): - results = {point_name: value} - meta = {point_name: self.meta_data[point_name]} - all_message = [results, meta] - individual_point_message = [value, self.meta_data[point_name]] - - depth_first_topic, breadth_first_topic = self.get_paths_for_point(point_name) - - if self.publish_depth_first: - self._publish_wrapper(depth_first_topic, - headers=headers, - message=individual_point_message) - # - if self.publish_breadth_first: - self._publish_wrapper(breadth_first_topic, - headers=headers, - message=individual_point_message) - - if self.publish_depth_first_all: - self._publish_wrapper(self.all_path_depth, headers=headers, message=all_message) - - if self.publish_breadth_first_all: - self._publish_wrapper(self.all_path_breadth, headers=headers, message=all_message) + et = self.equipment_model + headers = publication_headers() + for point, value in point_values.items(): # TODO: How is point different from point_name? + # TODO: Several of these things can be outside loop if the point in the loop isn't important. + point_depth_topic, point_breadth_topic = et.get_point_topics(point_name) + device_depth_topic, device_breadth_topic = et.get_device_topics(point_name) + point_node = self.equipment_model.get_node(point_name) + if et.is_published_single_depth(point_name): + publish_wrapper(self.vip, point_depth_topic, headers, [value, point_node.meta_data]) + if et.is_published_single_breadth(point_name): + publish_wrapper(self.vip, point_breadth_topic, headers, [value, point_node.meta_data]) + if et.is_published_multi_depth(point_name) or et.is_published_all_depth(point_name): + publish_wrapper(self.vip, device_depth_topic, headers, + [{point_name: value}, {point_name: point_node.meta_data}]) + if et.is_published_multi_breadth(point_name) or et.is_published_all_breadth(point_name): + publish_wrapper(self.vip, device_breadth_topic, headers, + [{point_name: value}, {point_name: point_node.meta_data}]) + + def add_equipment(self, device_node): + # TODO: Is logic needed for scheduling or any other purpose on adding equipment to this remote? + self.add_registers([p.config for p in self.equipment_model.points(device_node.identifier)], + device_node.identifier) + self.equipment.add(device_node) + + @property + def point_set(self): + return {point for equip in self.equipment for point in self.equipment_model.points(equip.identifier)} diff --git a/src/volttron/driver/base/driver_locks.py b/src/volttron/driver/base/driver_locks.py index 6b967da..faf9839 100644 --- a/src/volttron/driver/base/driver_locks.py +++ b/src/volttron/driver/base/driver_locks.py @@ -21,13 +21,55 @@ # # ===----------------------------------------------------------------------=== # }}} +import logging +import resource from contextlib import contextmanager - from gevent.lock import BoundedSemaphore, DummySemaphore + +_log = logging.getLogger(__name__) + _socket_lock = None +def get_system_socket_limit(): + # Increase open files resource limit to max or 8192 if unlimited + system_socket_limit = None + try: + soft, hard = resource.getrlimit(resource.RLIMIT_NOFILE) + except OSError: + _log.exception('error getting open file limits') + else: + if soft != hard and soft != resource.RLIM_INFINITY: + try: + system_socket_limit = 8192 if hard == resource.RLIM_INFINITY else hard + resource.setrlimit(resource.RLIMIT_NOFILE, (system_socket_limit, hard)) + except OSError: + _log.exception('error setting open file limits') + else: + _log.debug('open file resource limit increased from %d to %d', soft, + system_socket_limit) + if soft == hard: + system_socket_limit = soft + return system_socket_limit + +def setup_socket_lock(max_open_sockets=None): + if max_open_sockets is not None: + max_connections = int(max_open_sockets) + _log.info("maximum concurrently open sockets limited to " + str(max_open_sockets)) + else: + system_socket_limit = get_system_socket_limit() + if system_socket_limit is not None: + max_connections = int(system_socket_limit * 0.8) + _log.info("maximum concurrently open sockets limited to " + str(max_open_sockets) + + " (derived from system limits)") + else: + max_connections = 0 + _log.warning( + "No limit set on the maximum number of concurrently open sockets. " + "Consider setting max_open_sockets if you plan to work with 800+ modbus devices." + ) + configure_socket_lock(max_connections) def configure_socket_lock(max_connections=0): global _socket_lock @@ -55,6 +97,13 @@ def socket_lock(): def configure_publish_lock(max_connections=0): + if max_connections < 1: + _log.warning( + "No limit set on the maximum number of concurrent driver publishes. " + "Consider setting max_concurrent_publishes if you plan to work with many devices." + ) + else: + _log.info("maximum concurrent driver publishes limited to " + str(max_connections)) global _publish_lock if _publish_lock is not None: raise RuntimeError("socket_lock already configured!") diff --git a/src/volttron/driver/base/interfaces.py b/src/volttron/driver/base/interfaces.py index f546d7c..3af4b4b 100644 --- a/src/volttron/driver/base/interfaces.py +++ b/src/volttron/driver/base/interfaces.py @@ -63,7 +63,7 @@ - :py:meth:`BaseInterface.configure` - :py:meth:`BaseInterface.set_point` - :py:meth:`BaseInterface.get_point` -- :py:meth:`BaseInterface.scrape_all` +- :py:meth:`BaseInterface.get_multiple_points` These methods are required but can be implemented using the :py:class:`BasicRevert` mixin. @@ -98,8 +98,8 @@ Device Scraping --------------- - -The work scheduling and publish periodic device scrapes is handled by +# TODO: Documentation in these files is all wrong now. +The work scheduling and publish periodic device polls is handled by the Platform Driver Agent. When a scrape starts the Platform Driver Agent calls the :py:meth:`BaseInterface.scrape_all`. It will take the results of the call and attach meta data and and publish as needed. @@ -156,6 +156,13 @@ import abc import logging +from collections.abc import KeysView +from weakref import WeakSet + +from volttron.utils import get_module, get_subclasses + +from volttron.driver.base.config import PointConfig, RemoteConfig + _log = logging.getLogger(__name__) @@ -174,13 +181,13 @@ class BaseRegister: :param register_type: Type of the register. Either "bit" or "byte". Usually "byte". :param read_only: Specify if the point can be written to. - :param pointName: Name of the register. + :param point_name: Name of the register. :param units: Units of the value of the register. :param description: Description of the register. :type register_type: str :type read_only: bool - :type pointName: str + :type point_name: str :type units: str :type description: str @@ -189,10 +196,10 @@ class BaseRegister: string for the units argument. """ - def __init__(self, register_type, read_only, pointName, units, description=''): + def __init__(self, register_type, read_only, point_name, units, description=''): self.read_only = read_only self.register_type = register_type - self.point_name = pointName + self.point_name = point_name self.units = units self.description = description self.python_type = int @@ -237,42 +244,37 @@ class BaseInterface(object, metaclass=abc.ABCMeta): """ - def __init__(self, vip=None, core=None, **kwargs): + REGISTER_CONFIG_CLASS = PointConfig + INTERFACE_CONFIG_CLASS = RemoteConfig + + def __init__(self, config: RemoteConfig, core, vip, *args, **kwargs): # Object does not take any arguments to the init. super(BaseInterface, self).__init__() - self.vip = vip + self.config = config + # TODO: Reevaluate whether vip and core are necessary here. They are currently only used by the bacnet interface. self.core = core + self.vip = vip - self.point_map = {} - - self.build_register_map() - def build_register_map(self): + self.point_map = {} self.registers = { - ('byte', True): [], - ('byte', False): [], - ('bit', True): [], - ('bit', False): [] + ('byte', True): WeakSet(), + ('byte', False): WeakSet(), + ('bit', True): WeakSet(), + ('bit', False): WeakSet() } - @abc.abstractmethod - def configure(self, config_dict, registry_config_str): - """ - Configures the :py:class:`Interface` for the specific instance of a device. - - :param config_dict: The "driver_config" section of the driver configuration file. - :param registry_config_str: The contents of the registry configuration file. - :type config_dict: dict - :type registry_config_str: str + def finalize_setup(self, initial_setup: bool = False): + """Finalize setup will be called after the interface has been configured and all registers have been inserted. + It will be called again after changes are made to configurations or registers + to perform any post-change setup. + Interfaces should override this method if post-configuration setup is required. - - This method must setup register representations of all points - on a device by creating instances of :py:class:`BaseRegister` (or a subclass) and adding them - to the Interface with :py:meth:`BaseInterface.insert_register`. - """ + :param initial_setup True on the first call. False for calls after changes. + """ pass - def get_register_by_name(self, name): + def get_register_by_name(self, name: str) -> type[BaseRegister]: """ Get a register by it's point name. @@ -316,32 +318,36 @@ def get_registers_by_type(self, reg_type, read_only): """ return self.registers[reg_type, read_only] - def insert_register(self, register): + @abc.abstractmethod + def create_register(self, register_definition: PointConfig) -> type[BaseRegister]: + """Create a register instance from the provided PointConfig. + + :param register_definition: PointConfig from which to create a Register instance. + """ + + def insert_register(self, register: BaseRegister, base_topic: str): """ Inserts a register into the :py:class:`Interface`. :param register: Register to add to the interface. - :type register: :py:class:`BaseRegister` + :param base_topic: Topic up to the point name. """ - register_point = register.point_name - self.point_map[register_point] = register - - register_type = register.get_register_type() - self.registers[register_type].append(register) + self.point_map['/'.join([base_topic, register.point_name])] = register + self.registers[register.get_register_type()].add(register) @abc.abstractmethod - def get_point(self, point_name, **kwargs): + def get_point(self, topic, **kwargs): """ Get the current value for the point name given. - :param point_name: Name of the point to retrieve. + :param topic: Name of the point to retrieve. :param kwargs: Any interface specific parameters. - :type point_name: str + :type topic: str :return: Point value """ @abc.abstractmethod - def set_point(self, point_name, value, **kwargs): + def set_point(self, topic, value, **kwargs): """ Set the current value for the point name given. @@ -352,23 +358,13 @@ def set_point(self, point_name, value, **kwargs): acceptable to return the value that was requested if no error occurs. - :param point_name: Name of the point to retrieve. + :param topic: Name of the point to retrieve. :param value: Value to set the point to. :param kwargs: Any interface specific parameters. - :type point_name: str + :type topic: str :return: Actual point value set. """ - @abc.abstractmethod - def scrape_all(self): - """ - Method the Platform Driver Agent calls to get the current state - of a device for publication. - - :return: Point names to values for device. - :rtype: dict - """ - @abc.abstractmethod def revert_all(self, **kwargs): """ @@ -378,49 +374,40 @@ def revert_all(self, **kwargs): """ @abc.abstractmethod - def revert_point(self, point_name, **kwargs): + def revert_point(self, topic, **kwargs): """ Revert point to it's default state. :param kwargs: Any interface specific parameters. """ - def get_multiple_points(self, path, point_names, **kwargs): + @abc.abstractmethod + def get_multiple_points(self, topics: KeysView[str], **kwargs) -> (dict, dict): """ Read multiple points from the interface. - :param path: Device path - :param point_names: Names of points to retrieve + :param topics: Names of points to retrieve :param kwargs: Any interface specific parameters - :type path: str - :type point_names: [str] - :type kwargs: dict - :returns: Tuple of dictionaries to results and any errors :rtype: (dict, dict) """ results = {} errors = {} - - for point_name in point_names: - return_key = path + '/' + point_name + for topic in topics: try: - value = self.get_point(point_name, **kwargs) - results[return_key] = value + value = self.get_point(topic, **kwargs) + results[topic] = value except Exception as e: - errors[return_key] = repr(e) + errors[topic] = repr(e) return results, errors - def set_multiple_points(self, path, point_names_values, **kwargs): + def set_multiple_points(self, topics_values, **kwargs): """ Set multiple points on the interface. - :param path: Device path - :param point_names_values: Point names and values to be set to. + :param topics_values: Topics and values to which they will be set. :param kwargs: Any interface specific parameters - :type path: str - :type point_names: [(str, k)] where k is the new value :type kwargs: dict :returns: Dictionary of points to any exceptions raised @@ -428,14 +415,34 @@ def set_multiple_points(self, path, point_names_values, **kwargs): """ results = {} - for point_name, value in point_names_values: + for topic, value in topics_values: try: - self.set_point(point_name, value, **kwargs) + self.set_point(topic, value, **kwargs) except Exception as e: - results[path + '/' + point_name] = repr(e) + results[topic] = repr(e) return results + @classmethod + def get_interface_subclass(cls, driver_type, module=None): + """Get Interface SubClass + Returns the subclass of this class in the module located from driver configuration or from the interface name. + """ + module_name = module if module is not None else f"volttron.driver.interfaces.{driver_type}.{driver_type}" + module = get_module(module_name) + subclasses = get_subclasses(module, cls) + return subclasses[0] + + @classmethod + def unique_remote_id(cls, config_name: str, config: RemoteConfig) -> tuple: + """Unique Remote ID + Subclasses should use this class method to return a hashable identifier which uniquely identifies a single + remote -- e.g., if multiple remotes may exist at a single IP address, but on different ports, + the unique ID might be the tuple: (ip_address, port). + The base class returns the name of the device configuration file, requiring a separate DriverAgent for each. + """ + return config_name, + class RevertTracker: """ @@ -522,8 +529,6 @@ def get_all_revert_values(self): If no default value is set and a no clean values have been submitted a point value will be an instance of :py:class:`DriverInterfaceError`. - :param point: Name of point to get. - :type point: str :return: Values to revert to. :rtype: dict """ @@ -565,7 +570,6 @@ class BasicRevert(object, metaclass=abc.ABCMeta): """ def __init__(self, **kwargs): - super(BasicRevert, self).__init__(**kwargs) self._tracker = RevertTracker() def _update_clean_values(self, points): @@ -581,27 +585,27 @@ def set_default(self, point, value): """ self._tracker.set_default(point, value) - def set_point(self, point_name, value): + def set_point(self, topic, value): """ Implementation of :py:meth:`BaseInterface.set_point` Passes arguments through to :py:meth:`BasicRevert._set_point` """ - result = self._set_point(point_name, value) - self._tracker.mark_dirty_point(point_name) + result = self._set_point(topic, value) + self._tracker.mark_dirty_point(topic) return result - def scrape_all(self): + def get_multiple_points(self, topics: KeysView[str], **kwargs) -> (dict, dict): """ Implementation of :py:meth:`BaseInterface.scrape_all` """ - result = self._scrape_all() - self._update_clean_values(result) + results, errors = self._get_multiple_points(topics, **kwargs) + self._update_clean_values(results) - return result + return results, errors @abc.abstractmethod - def _set_point(self, point_name, value): + def _set_point(self, topic, value): """ Set the current value for the point name given. @@ -616,25 +620,24 @@ def _set_point(self, point_name, value): acceptable to return the value that was requested if no error occurs. - :param point_name: Name of the point to retrieve. + :param topic: Name of the point to retrieve. :param value: Value to set the point to. :param kwargs: Any interface specific parameters. - :type point_name: str + :type topic: str :return: Actual point value set. """ @abc.abstractmethod - def _scrape_all(self): + def _get_multiple_points(self, topics: KeysView[str], **kwargs) -> (dict, dict): """ - Method the Platform Driver Agent calls to get the current state - of a device for publication. + Method the Platform Driver Agent calls to get multiple point values. If using this mixin you must override this method - instead of :py:meth:`BaseInterface.scrape_all`. Otherwise + instead of :py:meth:`BaseInterface.get_multiple_points`. Otherwise the purpose is exactly the same. :return: Point names to values for device. - :rtype: dict + :rtype: dict, dict """ def revert_all(self, **kwargs): @@ -643,41 +646,41 @@ def revert_all(self, **kwargs): Implementation of :py:meth:`BaseInterface.revert_all` - Calls :py:meth:`BasicRevert._set_point` with `point_name` + Calls :py:meth:`BasicRevert._set_point` with `topic` and the value to revert the point to for every writable point on a device. Currently \*\*kwargs is ignored. """ points = self._tracker.get_all_revert_values() - for point_name, value in points.items(): + for topic, value in points.items(): if not isinstance(value, DriverInterfaceError): try: - self._set_point(point_name, value) - self._tracker.clear_dirty_point(point_name) + self._set_point(topic, value) + self._tracker.clear_dirty_point(topic) except Exception as e: - _log.warning("Error while reverting point {}: {}".format(point_name, str(e))) + _log.warning("Error while reverting point {}: {}".format(topic, str(e))) - def revert_point(self, point_name, **kwargs): + def revert_point(self, topic, **kwargs): r""" Implementation of :py:meth:`BaseInterface.revert_point` Revert point to its default state. - Calls :py:meth:`BasicRevert._set_point` with `point_name` + Calls :py:meth:`BasicRevert._set_point` with `topic` and the value to revert the point to. - :param point_name: Name of the point to revert. - :type point_name: str + :param topic: Name of the point to revert. + :type topic: str Currently \*\*kwargs is ignored. """ try: - value = self._tracker.get_revert_value(point_name) + value = self._tracker.get_revert_value(topic) except DriverInterfaceError: return - _log.debug("Reverting {} to {}".format(point_name, value)) + _log.info("Reverting {} to {}".format(topic, value)) - self._set_point(point_name, value) - self._tracker.clear_dirty_point(point_name) + self._set_point(topic, value) + self._tracker.clear_dirty_point(topic) diff --git a/src/volttron/driver/base/utils.py b/src/volttron/driver/base/utils.py new file mode 100644 index 0000000..24e8d1b --- /dev/null +++ b/src/volttron/driver/base/utils.py @@ -0,0 +1,47 @@ +import gevent +import logging +import random + +from volttron.client.messaging import headers as headers_mod +from volttron.client.vip.agent.errors import Again, VIPError +from volttron.utils import format_timestamp, get_aware_utc_now + +from .driver_locks import publish_lock + + +_log = logging.getLogger(__name__) + + +def publication_headers(): + # TODO: Sync_Timestamp won't work, so far, because time_slot_offset assumed the device was polled once per round. + # Since we are polling through a hyperperiod that may include multiple rounds for a given point or equipment, + # this no longer makes sense. Also, what if some points are polled multiple times compared to others? + # CAN SCHEDULED ALL_PUBLISHES REPLACE THIS MECHANISM IF THEY ARE GENERATED ALL AT ONCE? + # OR JUST USE THIS IS ALL-TYPE PUBLISHES ON A SCHEDULE? + utcnow_string = format_timestamp(get_aware_utc_now()) + headers = { + headers_mod.DATE: utcnow_string, + headers_mod.TIMESTAMP: utcnow_string, + # headers_mod.SYNC_TIMESTAMP: format_timestamp(current_start - timedelta(seconds=self.time_slot_offset)) + } + return headers + +def publish_wrapper(vip, topic, headers, message): + while True: + try: + with publish_lock(): + _log.debug("publishing: " + topic) + # TODO: Do we really need to block on every publish call? + vip.pubsub.publish('pubsub', topic, headers=headers, message=message).get(timeout=10.0) + _log.debug("finish publishing: " + topic) + except gevent.Timeout: + _log.warning("Did not receive confirmation of publish to " + topic) + break + except Again: + _log.warning("publish delayed: " + topic + " pubsub is busy") + gevent.sleep(random.random()) + except VIPError as ex: + _log.warning("driver failed to publish " + topic + ": " + str(ex)) + break + else: + break \ No newline at end of file diff --git a/tests/test_driver.py b/tests/test_driver.py index 612751f..d7e8dc3 100644 --- a/tests/test_driver.py +++ b/tests/test_driver.py @@ -50,8 +50,8 @@ def test_update_publish_types_should_only_set_depth_first_to_true(): driver_agent.update_publish_types(publish_depth_first_all, publish_breadth_first_all, publish_depth_first, publish_breadth_first) - assert not driver_agent.publish_depth_first_all - assert not driver_agent.publish_breadth_first_all + assert not driver_agent.publish_all_depth + assert not driver_agent.publish_all_breadth assert driver_agent.publish_depth_first assert not driver_agent.publish_breadth_first @@ -159,8 +159,8 @@ def test_periodic_read_should_succeed(): interface_scrape_all={"foo": "bar"}) as driver_agent: driver_agent.periodic_read(now) - driver_agent.parent.scrape_starting.assert_called_once() - driver_agent.parent.scrape_ending.assert_called_once() + driver_agent.agent.scrape_starting.assert_called_once() + driver_agent.agent.scrape_ending.assert_called_once() driver_agent._publish_wrapper.assert_called_once() assert isinstance(driver_agent.periodic_read_event, ScheduledEvent) @@ -176,8 +176,8 @@ def test_periodic_read_should_return_none_on_scrape_response(scrape_all_response result = driver_agent.periodic_read(now) assert result is None - driver_agent.parent.scrape_starting.assert_called_once() - driver_agent.parent.scrape_ending.assert_not_called() + driver_agent.agent.scrape_starting.assert_called_once() + driver_agent.agent.scrape_ending.assert_not_called() driver_agent._publish_wrapper.assert_not_called() assert isinstance(driver_agent.periodic_read_event, ScheduledEvent) diff --git a/tests/utils.py b/tests/utils.py index dd0e90e..3810c05 100644 --- a/tests/utils.py +++ b/tests/utils.py @@ -113,6 +113,7 @@ def base_driver_agent(has_base_topic: bool = False, driver_agent.core.schedule.cancel = None if meta_data is not None: + # TODO: drive_agent.meta_data is not stored in each point node instead of a dictionary in the driver agent. driver_agent.meta_data = meta_data if mock_publish_wrapper: