diff --git a/.gitignore b/.gitignore index 305e73e838..43b6d6be2b 100644 --- a/.gitignore +++ b/.gitignore @@ -40,3 +40,11 @@ rabbitmq-server.download.tar.xz *ecobee_*.json .env* dist/ + +Pipfile +modbus_configs +modbus_tk_configs +activateenv +volttron.service +upgrade-scripts +volttron_modbus.log diff --git a/docs/source/setup/index.rst b/docs/source/setup/index.rst new file mode 100644 index 0000000000..e69de29bb2 diff --git a/services/core/PlatformDriverAgent/platform_driver/driver.py b/services/core/PlatformDriverAgent/platform_driver/driver.py index b86688f465..cc44cd432f 100644 --- a/services/core/PlatformDriverAgent/platform_driver/driver.py +++ b/services/core/PlatformDriverAgent/platform_driver/driver.py @@ -231,9 +231,16 @@ def periodic_read(self, now): try: results = self.interface.scrape_all() register_names = self.interface.get_register_names_view() + _log.debug(f"results keys: {results.keys}") + _log.debug(f"register_names keys: {register_names}") + # register_names contains list of all points + # results.keys gives points from the device + # The following loop will check if there are more points from the device than what + # was expected and then log an error for point in (register_names - results.keys()): + _log.debug(f"Scraping point: {point}") depth_first_topic = self.base_topic(point=point) - _log.error("Failed to scrape point: "+depth_first_topic) + _log.error(f"Failed to scrape point: {depth_first_topic}") except (Exception, gevent.Timeout) as ex: tb = traceback.format_exc() _log.error('Failed to scrape ' + self.device_name + ':\n' + tb) diff --git a/services/core/PlatformDriverAgent/platform_driver/driver_locks.py b/services/core/PlatformDriverAgent/platform_driver/driver_locks.py index 2b0c0d9774..aabbcf31ed 100644 --- a/services/core/PlatformDriverAgent/platform_driver/driver_locks.py +++ b/services/core/PlatformDriverAgent/platform_driver/driver_locks.py @@ -22,9 +22,15 @@ # ===----------------------------------------------------------------------=== # }}} +from collections import defaultdict +import logging from gevent.lock import BoundedSemaphore, DummySemaphore from contextlib import contextmanager +from volttron.platform.agent import utils +utils.setup_logging() +_log = logging.getLogger(__name__) + _socket_lock = None def configure_socket_lock(max_connections=0): @@ -68,3 +74,44 @@ def publish_lock(): yield finally: _publish_lock.release() + +_client_socket_locks = defaultdict(lambda: None) +lock_counter = 0 + +def configure_client_socket_lock(address, port, max_connections=0): + _log.debug("Configuring client socket lock for {}:{}".format(address, port)) + global _client_socket_locks + if _client_socket_locks[(address, port)] is not None: + if isinstance(_client_socket_locks[(address, port)], DummySemaphore) or isinstance(_client_socket_locks[(address, port)], BoundedSemaphore): + _log.debug(f"Client socket lock already configured for {address}:{port}") + return + else: + raise RuntimeError("client socket lock already configured!") + if max_connections < 1: + _client_socket_locks[(address, port)] = DummySemaphore() + else: + _client_socket_locks[(address, port)] = BoundedSemaphore(max_connections) + +@contextmanager +def client_socket_locks(address, port): + global _client_socket_locks + lock = _client_socket_locks[(address, port)] + _log.debug(f"ADDRESS: {address}") + _log.debug(f"PORT: {port}") + _log.debug(f"Acquiring client socket lock ({type(lock)}) for {address}:{port} at {id(lock)}") + if lock is None: + _log.debug(f"socket_lock not configured {address}:{port}") + _log.debug(f"lock is None: lock: {lock}, type: {type(lock)}, id ${id(lock)}") + raise RuntimeError("socket_lock not configured!") + lock.acquire() + global lock_counter + lock_counter +=1 + _log.debug(f"lock_counter: {lock_counter}") + + try: + yield + finally: + _log.debug(f"Releasing client socket lock ({type(lock)}) for {address}:{port} at {id(lock)}") + lock.release() + lock_counter -=1 + _log.debug(f"lock_counter after release: {lock_counter}") \ No newline at end of file diff --git a/services/core/PlatformDriverAgent/platform_driver/interfaces/modbus_tk/__init__.py b/services/core/PlatformDriverAgent/platform_driver/interfaces/modbus_tk/__init__.py index a663498c44..d397840f01 100644 --- a/services/core/PlatformDriverAgent/platform_driver/interfaces/modbus_tk/__init__.py +++ b/services/core/PlatformDriverAgent/platform_driver/interfaces/modbus_tk/__init__.py @@ -27,6 +27,7 @@ from platform_driver.interfaces import BaseRegister, BaseInterface, BasicRevert from platform_driver.interfaces.modbus_tk import helpers from platform_driver.interfaces.modbus_tk.maps import Map +from platform_driver.driver_locks import configure_client_socket_lock import logging import struct @@ -297,6 +298,9 @@ def configure(self, config_dict, registry_config_lst): endian=endian, registry_config_lst=selected_registry_config_lst ).get_class() + + _log.debug(f"CONFIGURING CLIENT LOCKS FOR MODBUSTK") + configure_client_socket_lock(device_address, port, max_connections=1) self.modbus_client = modbus_client_class(device_address=device_address, port=port, diff --git a/services/core/PlatformDriverAgent/platform_driver/interfaces/modbus_tk/client.py b/services/core/PlatformDriverAgent/platform_driver/interfaces/modbus_tk/client.py index 6581e3cff2..dcfa58e07e 100644 --- a/services/core/PlatformDriverAgent/platform_driver/interfaces/modbus_tk/client.py +++ b/services/core/PlatformDriverAgent/platform_driver/interfaces/modbus_tk/client.py @@ -48,18 +48,23 @@ class MyModbusMaster (Client): from datetime import datetime import collections import struct +import gevent import serial import six.moves import logging import math +from volttron.platform.agent import utils import modbus_tk.defines as modbus_constants import modbus_tk.modbus_tcp as modbus_tcp import modbus_tk.modbus_rtu as modbus_rtu -from modbus_tk.exceptions import ModbusError +from modbus_tk.exceptions import ModbusError, ModbusInvalidResponseError +from platform_driver.driver_locks import client_socket_locks from . import helpers +# utils.setup_logging() +# _log = logging.getLogger(__name__) logger = logging.getLogger(__name__) # In cache representation of modbus field. @@ -310,7 +315,7 @@ def fix_address(self, address_style): self._address = self._address - helpers.TABLE_ADDRESS[self._table] elif address_style == helpers.ADDRESS_OFFSET_PLUS_ONE: self._address = self._address - 1 - if self._address < 0 or self._address > 10000: + if self._address < 0 or self._address > (2 ** 16 - 1): raise Exception("Modbus address out of range for table.") @@ -608,6 +613,8 @@ def __init__(self, *args, **kwargs): self._error_count = 0 def set_transport_tcp(self, hostname, port, timeout_in_sec=1.0): + self.device_address = hostname + self.port = port self.client = modbus_tcp.TcpMaster(host=hostname, port=int(port), timeout_in_sec=timeout_in_sec) return self @@ -657,30 +664,101 @@ def get_request(self, field): return self.__meta[helpers.META_REQUEST_MAP].get(field, None) def read_request(self, request): - logger.debug("Requesting: %s", request) - try: - results = self.client.execute( - self.slave_address, - request.read_function_code, - request.address, - quantity_of_x=request.count, - data_format=request.formatting, - threadsafe=False - ) - self._data.update(request.parse_values(results)) - except (AttributeError, ModbusError) as err: - if err is ModbusError: - code = err.get_exception_code() - raise Exception(f'{err.args[0]}, {helpers.TABLE_EXCEPTION_CODE.get(code, "UNDEFINED")}') - logger.warning("modbus read_all() failure on request: %s\tError: %s", request, err) + results = self.client.execute( + self.slave_address, + request.read_function_code, + request.address, + quantity_of_x=request.count, + data_format=request.formatting, + threadsafe=False + ) + logger.debug("Successfully read the request...") + self._data.update(request.parse_values(results)) + # try: + # results = self.client.execute( + # self.slave_address, + # request.read_function_code, + # request.address, + # quantity_of_x=request.count, + # data_format=request.formatting, + # threadsafe=False + # ) + # logger.debug("Successfully read the request...") + # self._data.update(request.parse_values(results)) + # except (AttributeError, ModbusError) as err: + # if "Exception code" in err.message: + # msg = "{0}: {1}".format(err.message, + # helpers.TABLE_EXCEPTION_CODE.get(err.message[-1], "UNDEFINED")) + # logger.debug(msg) + # raise Exception("{0}: {1}".format(msg)) + # logger.warning("modbus read_all() failure on request: %s\tError: %s", request, err) + + def timer(slogger): + """Print the runtime of the decorated function""" + from functools import wraps + import time + def decorator_timer(func): + @wraps(func) + def wrapper_timer(*args, **kwargs): + start_time = datetime.now() # 1 + value = func(*args, **kwargs) + end_time = datetime.now() # 2 + run_time_sec = end_time - start_time + slogger.debug( + f"Finished {func.__name__!r} in {run_time_sec.total_seconds()} seconds" + ) + return value + + return wrapper_timer + + return decorator_timer + @timer(logger) def read_all(self): + logger.debug(f"READ_ALL Time now: {datetime.now()}") requests = self.__meta[helpers.META_REQUESTS] self._data.clear() - for r in requests: - self.read_request(r) + # gets the lock + with client_socket_locks(self.device_address, self.port): + logger.debug(f"Entered lock for {self.device_address}:{self.port}-{self.slave_address}") + logger.debug(f"Total requests to be read: {len(requests)}") + for r in requests: + logger.debug(f"Attempting to read_request on request: {r}") + retries = 3 + while retries > 0: + logger.debug(f"Retry: {retries}") + exception_flag = False + try: + self.read_request(r) + break # can use break or continue + except ConnectionResetError: + exception_flag = True + logger.warning("ConnectionResetError on read_request()") + logger.warning(f"Error response: {e}") + except ModbusInvalidResponseError as e: + exception_flag = True + logger.warning("ModbusInvalidResponseError on read_request()") + logger.warning(f"Error response: {e}") + except Exception as e: + exception_flag = True + logger.warning("CATCHING ALL EXCEPTIONS") + logger.warning(f"Error response: {e}") + # if exception_flag: + # logger.warning("CLOSING SOCKET CONNECTION") + # self.client.close() + # gevent.sleep(1.0) + # logger.warning("OPENING SOCKET CONNECTION") + # self.client.open() + retries -= 1 + + if retries == 0: + logger.debug(f"Failed to read request: {r}") + else: + logger.debug(f"Succesfully read the request on retry: {retries}") + logger.debug(f"left lock for {self.device_address}:{self.port}-{self.slave_address}") + @timer(logger) def dump_all(self): self.read_all() return [(f, diff --git a/services/core/PlatformDriverAgent/platform_driver/interfaces/modbus_tk/helpers.py b/services/core/PlatformDriverAgent/platform_driver/interfaces/modbus_tk/helpers.py index 5a4a83a17d..6e2b72868a 100644 --- a/services/core/PlatformDriverAgent/platform_driver/interfaces/modbus_tk/helpers.py +++ b/services/core/PlatformDriverAgent/platform_driver/interfaces/modbus_tk/helpers.py @@ -178,7 +178,7 @@ def inverse_func(value): if value < 0: return (0 - (value / float(multiplier))) - 0xFFFF else: - return (value / float(multipliers)) + return (value / float(multiplier)) except TypeError: #string return value except ZeroDivisionError: diff --git a/volttron/platform/packaging.py b/volttron/platform/packaging.py index 2a82f2246c..f4956b1593 100644 --- a/volttron/platform/packaging.py +++ b/volttron/platform/packaging.py @@ -579,7 +579,7 @@ def main(argv=sys.argv): help='Directory to create the new agent in (must not exist).') init_parser.add_argument('module_name', help='Module name for agent. Class name is derived from this name.') - init_parser.add_argument('--template', choices=_get_agent_template_list(), + init_parser.add_argumentq('--template', choices=_get_agent_template_list(), help='Name of the template to use. Defaults to "common".') init_parser.add_argument('--identity', help='Set agent to have a fixed VIP identity. Useful for new service agents.')