Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Hotfix/modbustk #3085

Open
wants to merge 25 commits into
base: develop
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
6bbd78f
pinned greenlet to compatible version for gevent
acedrew Nov 18, 2020
d65994a
fix for #2618
schandrika Feb 23, 2021
619dcfd
documented requirement for web packages for volttron 7.x. github iss…
schandrika Feb 23, 2021
b51576c
documented requirement for web packages for volttron 7.x. github iss…
schandrika Feb 23, 2021
9a7c90e
Merge pull request #2620 from schandrika/voltrron7_fix
shwethanidd Feb 23, 2021
36d887e
Merge branch 'VOLTTRON:releases/7.x' into releases/7.x
acedrew May 14, 2021
41eac5e
Pin 7.x branch #2795
craig8 Sep 19, 2021
951fe39
Merge branch '7x_rabbit_update' of https://github.com/shwethanidd/vol…
craig8 Sep 19, 2021
09b9d34
Updating rabbitmq dependency script since the rabbitmq server repos h…
shwethanidd Sep 21, 2021
bc81e78
Merge pull request #2800 from shwethanidd/7x_rabbit_update
craig8 Sep 21, 2021
5f19a74
Moved all requirements to requirements.py rather than split across bo…
craig8 Sep 21, 2021
b0f8192
Merge pull request #2799 from craig8/pin_v7_x
shwethanidd Sep 21, 2021
e861697
Fixing requirements.py related to wheels entry in option requirements
shwethanidd Sep 21, 2021
c540747
Merge pull request #2801 from shwethanidd/7x_rabbit_update
shwethanidd Sep 21, 2021
5bb3d9b
seems to be working, implemented only for reading
acedrew Apr 15, 2022
27442df
Merge branch 'hotfix-modbus-tk' into hotfix/modbustk
bonicim Apr 25, 2022
f4661af
Merge remote-tracking branch 'upstream/main' into hotfix/modbustk
bonicim Apr 25, 2022
dda15f0
Update logs
bonicim Apr 25, 2022
0ccf2a1
Fix reference from master_driver to platform_driver
bonicim Apr 25, 2022
0401b75
Update init for modbustk
bonicim May 3, 2022
b863431
Update drivers
bonicim May 4, 2022
ccba16d
Add test configs for modbus
bonicim May 10, 2022
a2b1d08
Add lock counter and logs
bonicim May 24, 2022
51eacea
Merge remote-tracking branch 'origin/develop' into hotfix/modbustk
bonicim May 9, 2024
ba5d6b1
Merge upstream develop.
rlutes Oct 11, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -40,3 +40,11 @@ rabbitmq-server.download.tar.xz
*ecobee_*.json
.env*
dist/

Pipfile
modbus_configs
modbus_tk_configs
activateenv
volttron.service
upgrade-scripts
volttron_modbus.log
Empty file added docs/source/setup/index.rst
Empty file.
9 changes: 8 additions & 1 deletion services/core/PlatformDriverAgent/platform_driver/driver.py
Original file line number Diff line number Diff line change
Expand Up @@ -231,9 +231,16 @@ def periodic_read(self, now):
try:
results = self.interface.scrape_all()
register_names = self.interface.get_register_names_view()
_log.debug(f"results keys: {results.keys}")
_log.debug(f"register_names keys: {register_names}")
# register_names contains list of all points
# results.keys gives points from the device
# The following loop will check if there are more points from the device than what
# was expected and then log an error
for point in (register_names - results.keys()):
_log.debug(f"Scraping point: {point}")
depth_first_topic = self.base_topic(point=point)
_log.error("Failed to scrape point: "+depth_first_topic)
_log.error(f"Failed to scrape point: {depth_first_topic}")
except (Exception, gevent.Timeout) as ex:
tb = traceback.format_exc()
_log.error('Failed to scrape ' + self.device_name + ':\n' + tb)
Expand Down
47 changes: 47 additions & 0 deletions services/core/PlatformDriverAgent/platform_driver/driver_locks.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,15 @@
# ===----------------------------------------------------------------------===
# }}}

from collections import defaultdict
import logging
from gevent.lock import BoundedSemaphore, DummySemaphore
from contextlib import contextmanager

from volttron.platform.agent import utils
utils.setup_logging()
_log = logging.getLogger(__name__)

_socket_lock = None

def configure_socket_lock(max_connections=0):
Expand Down Expand Up @@ -68,3 +74,44 @@ def publish_lock():
yield
finally:
_publish_lock.release()

_client_socket_locks = defaultdict(lambda: None)
lock_counter = 0

def configure_client_socket_lock(address, port, max_connections=0):
_log.debug("Configuring client socket lock for {}:{}".format(address, port))
global _client_socket_locks
if _client_socket_locks[(address, port)] is not None:
if isinstance(_client_socket_locks[(address, port)], DummySemaphore) or isinstance(_client_socket_locks[(address, port)], BoundedSemaphore):
_log.debug(f"Client socket lock already configured for {address}:{port}")
return
else:
raise RuntimeError("client socket lock already configured!")
if max_connections < 1:
_client_socket_locks[(address, port)] = DummySemaphore()
else:
_client_socket_locks[(address, port)] = BoundedSemaphore(max_connections)

@contextmanager
def client_socket_locks(address, port):
global _client_socket_locks
lock = _client_socket_locks[(address, port)]
_log.debug(f"ADDRESS: {address}")
_log.debug(f"PORT: {port}")
_log.debug(f"Acquiring client socket lock ({type(lock)}) for {address}:{port} at {id(lock)}")
if lock is None:
_log.debug(f"socket_lock not configured {address}:{port}")
_log.debug(f"lock is None: lock: {lock}, type: {type(lock)}, id ${id(lock)}")
raise RuntimeError("socket_lock not configured!")
lock.acquire()
global lock_counter
lock_counter +=1
_log.debug(f"lock_counter: {lock_counter}")

try:
yield
finally:
_log.debug(f"Releasing client socket lock ({type(lock)}) for {address}:{port} at {id(lock)}")
lock.release()
lock_counter -=1
_log.debug(f"lock_counter after release: {lock_counter}")
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
from platform_driver.interfaces import BaseRegister, BaseInterface, BasicRevert
from platform_driver.interfaces.modbus_tk import helpers
from platform_driver.interfaces.modbus_tk.maps import Map
from platform_driver.driver_locks import configure_client_socket_lock

import logging
import struct
Expand Down Expand Up @@ -297,6 +298,9 @@ def configure(self, config_dict, registry_config_lst):
endian=endian,
registry_config_lst=selected_registry_config_lst
).get_class()

_log.debug(f"CONFIGURING CLIENT LOCKS FOR MODBUSTK")
configure_client_socket_lock(device_address, port, max_connections=1)

self.modbus_client = modbus_client_class(device_address=device_address,
port=port,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,18 +48,23 @@ class MyModbusMaster (Client):
from datetime import datetime
import collections
import struct
import gevent
import serial
import six.moves
import logging
import math

from volttron.platform.agent import utils
import modbus_tk.defines as modbus_constants
import modbus_tk.modbus_tcp as modbus_tcp
import modbus_tk.modbus_rtu as modbus_rtu
from modbus_tk.exceptions import ModbusError
from modbus_tk.exceptions import ModbusError, ModbusInvalidResponseError
from platform_driver.driver_locks import client_socket_locks

from . import helpers

# utils.setup_logging()
# _log = logging.getLogger(__name__)
logger = logging.getLogger(__name__)

# In cache representation of modbus field.
Expand Down Expand Up @@ -310,7 +315,7 @@ def fix_address(self, address_style):
self._address = self._address - helpers.TABLE_ADDRESS[self._table]
elif address_style == helpers.ADDRESS_OFFSET_PLUS_ONE:
self._address = self._address - 1
if self._address < 0 or self._address > 10000:
if self._address < 0 or self._address > (2 ** 16 - 1):
raise Exception("Modbus address out of range for table.")


Expand Down Expand Up @@ -608,6 +613,8 @@ def __init__(self, *args, **kwargs):
self._error_count = 0

def set_transport_tcp(self, hostname, port, timeout_in_sec=1.0):
self.device_address = hostname
self.port = port
self.client = modbus_tcp.TcpMaster(host=hostname, port=int(port), timeout_in_sec=timeout_in_sec)
return self

Expand Down Expand Up @@ -657,30 +664,101 @@ def get_request(self, field):
return self.__meta[helpers.META_REQUEST_MAP].get(field, None)

def read_request(self, request):
logger.debug("Requesting: %s", request)
try:
results = self.client.execute(
self.slave_address,
request.read_function_code,
request.address,
quantity_of_x=request.count,
data_format=request.formatting,
threadsafe=False
)
self._data.update(request.parse_values(results))
except (AttributeError, ModbusError) as err:
if err is ModbusError:
code = err.get_exception_code()
raise Exception(f'{err.args[0]}, {helpers.TABLE_EXCEPTION_CODE.get(code, "UNDEFINED")}')

logger.warning("modbus read_all() failure on request: %s\tError: %s", request, err)
results = self.client.execute(
self.slave_address,
request.read_function_code,
request.address,
quantity_of_x=request.count,
data_format=request.formatting,
threadsafe=False
)
logger.debug("Successfully read the request...")
self._data.update(request.parse_values(results))
# try:
# results = self.client.execute(
# self.slave_address,
# request.read_function_code,
# request.address,
# quantity_of_x=request.count,
# data_format=request.formatting,
# threadsafe=False
# )
# logger.debug("Successfully read the request...")
# self._data.update(request.parse_values(results))
# except (AttributeError, ModbusError) as err:
# if "Exception code" in err.message:
# msg = "{0}: {1}".format(err.message,
# helpers.TABLE_EXCEPTION_CODE.get(err.message[-1], "UNDEFINED"))
# logger.debug(msg)
# raise Exception("{0}: {1}".format(msg))
# logger.warning("modbus read_all() failure on request: %s\tError: %s", request, err)

def timer(slogger):
"""Print the runtime of the decorated function"""
from functools import wraps
import time
def decorator_timer(func):
@wraps(func)
def wrapper_timer(*args, **kwargs):
start_time = datetime.now() # 1
value = func(*args, **kwargs)
end_time = datetime.now() # 2
run_time_sec = end_time - start_time
slogger.debug(
f"Finished {func.__name__!r} in {run_time_sec.total_seconds()} seconds"
)
return value

return wrapper_timer

return decorator_timer

@timer(logger)
def read_all(self):
logger.debug(f"READ_ALL Time now: {datetime.now()}")
requests = self.__meta[helpers.META_REQUESTS]
self._data.clear()
for r in requests:
self.read_request(r)
# gets the lock
with client_socket_locks(self.device_address, self.port):
logger.debug(f"Entered lock for {self.device_address}:{self.port}-{self.slave_address}")
logger.debug(f"Total requests to be read: {len(requests)}")
for r in requests:
logger.debug(f"Attempting to read_request on request: {r}")
retries = 3
while retries > 0:
logger.debug(f"Retry: {retries}")
exception_flag = False
try:
self.read_request(r)
break # can use break or continue
except ConnectionResetError:
exception_flag = True
logger.warning("ConnectionResetError on read_request()")
logger.warning(f"Error response: {e}")
except ModbusInvalidResponseError as e:
exception_flag = True
logger.warning("ModbusInvalidResponseError on read_request()")
logger.warning(f"Error response: {e}")
except Exception as e:
exception_flag = True
logger.warning("CATCHING ALL EXCEPTIONS")
logger.warning(f"Error response: {e}")
# if exception_flag:
# logger.warning("CLOSING SOCKET CONNECTION")
# self.client.close()
# gevent.sleep(1.0)
# logger.warning("OPENING SOCKET CONNECTION")
# self.client.open()
retries -= 1

if retries == 0:
logger.debug(f"Failed to read request: {r}")
else:
logger.debug(f"Succesfully read the request on retry: {retries}")
logger.debug(f"left lock for {self.device_address}:{self.port}-{self.slave_address}")

@timer(logger)
def dump_all(self):
self.read_all()
return [(f,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -178,7 +178,7 @@ def inverse_func(value):
if value < 0:
return (0 - (value / float(multiplier))) - 0xFFFF
else:
return (value / float(multipliers))
return (value / float(multiplier))
except TypeError: #string
return value
except ZeroDivisionError:
Expand Down
2 changes: 1 addition & 1 deletion volttron/platform/packaging.py
Original file line number Diff line number Diff line change
Expand Up @@ -579,7 +579,7 @@ def main(argv=sys.argv):
help='Directory to create the new agent in (must not exist).')
init_parser.add_argument('module_name',
help='Module name for agent. Class name is derived from this name.')
init_parser.add_argument('--template', choices=_get_agent_template_list(),
init_parser.add_argumentq('--template', choices=_get_agent_template_list(),
help='Name of the template to use. Defaults to "common".')
init_parser.add_argument('--identity',
help='Set agent to have a fixed VIP identity. Useful for new service agents.')
Expand Down
Loading