Skip to content

Commit

Permalink
Added read/write by list feature for symbols (this branch was rebased…
Browse files Browse the repository at this point in the history
… on top of the refactor)
  • Loading branch information
RobertoRoos committed Oct 19, 2021
1 parent fd1c361 commit ec4e338
Show file tree
Hide file tree
Showing 3 changed files with 143 additions and 4 deletions.
138 changes: 138 additions & 0 deletions pyads/connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -68,13 +68,17 @@
adsGetNetIdForPLC,
adsGetSymbolInfo,
adsSumRead,
adsSumReadBytes,
adsSumWrite,
adsSumWriteBytes,
adsReleaseHandle,
adsSyncReadByNameEx,
adsSyncWriteByNameEx,
adsSyncAddDeviceNotificationReqEx,
adsSyncDelDeviceNotificationReqEx,
adsSyncSetTimeoutEx,
get_value_from_ctype_data,
type_is_string,
)
from .structs import (
AmsAddr,
Expand All @@ -92,6 +96,7 @@
bytes_from_dict,
size_of_structure,
)
from .errorcodes import ERROR_CODES
from .symbol import AdsSymbol
from .utils import decode_ads

Expand Down Expand Up @@ -800,6 +805,139 @@ def write_structure_by_name(
data_name, byte_values, c_ubyte * structure_size, handle=handle
)

def read_list_of_symbols(
self,
symbols: List[AdsSymbol],
ads_sub_commands: int = MAX_ADS_SUB_COMMANDS,
):
"""Read new values for a list of AdsSymbols using a single ADS call.
The outputs will be returned as a dictionary, but the cache of each symbol will
be updated too.
Comparable to :meth:`Connection.read_list_by_name`.
See also :class:`pyads.AdsSymbol`.
:param symbols: List if symbol instances
:param ads_sub_commands: Max. number of symbols per call (see
`read_list_by_name`)
"""

# Relying on `adsSumRead()` is tricky, because we do not have the `dataType`
# (integer) for each symbol, we only have the ctypes-type.
# Instead a very similar low-level read is done.

# Allocate return dict
result = {symbol.name: None for symbol in symbols}

symbol_infos = [(sym.index_group, sym.index_offset, sizeof(sym.plc_type))
for sym in symbols]

sum_response = adsSumReadBytes(self._port, self._adr, symbol_infos)

data_start = 4 * len(symbols)
offset = data_start

for i, symbol in enumerate(symbols):

if symbol.is_structure:
raise ValueError("Method not available for structured variables")

# Check if read was successful for this variable
error = struct.unpack_from("<I", sum_response, offset=i * 4)[0]
if error:
result[symbol.name] = ERROR_CODES[error]
else:
obj = symbol.plc_type.from_buffer(sum_response, offset)

value = get_value_from_ctype_data(obj, symbol.plc_type)

result[symbol.name] = value
symbol._value = value # Update symbol cache

offset += sizeof(symbol.plc_type)

return result

def write_list_of_symbols(
self,
symbols: Union[List[AdsSymbol], Dict[AdsSymbol, Any]],
):
"""Write new values to a list of symbols.
Either specify a dict of symbols, or first set the `_value` property of
each symbol and then pass them as a list.
For example:
.. code:: python
# Using dict
new_data = {symbol1: 3.14, symbol2: False}
plc.write_list_of_symbols(new_data)
# Using cache
symbol1._value = 3.14
symbol2._value = False
plc.write_list_of_symbols([symbol1, symbol2])
Comparable to :meth:`Connection.write_list_by_name`.
See also :class:`pyads.AdsSymbol`.
:param symbols: Symbols to write to
"""

# Initialize big buffer with new data to send over
offset = 0
num_requests = len(symbols)
total_request_size = num_requests * 3 * 4 # iGroup, iOffset & size

for symbol in symbols:
total_request_size += sizeof(symbol.plc_type)

buf = bytearray(total_request_size)

for symbol in symbols:
struct.pack_into("<I", buf, offset, symbol.index_group)
struct.pack_into("<I", buf, offset + 4, symbol.index_offset)
struct.pack_into("<I", buf, offset + 8, sizeof(symbol.plc_type))
offset += 12

if isinstance(symbols, dict):
new_values = list(symbols.keys())
else:
new_values = None

for i, symbol in enumerate(symbols):

if symbol.is_structure:
raise ValueError("List write not supported for struct symbols")

new_value = new_values[i] if new_values is not None else symbol._value

# Convert value to bytes and insert into buffer
if type_is_string(symbol.plc_type):
buf[offset: offset + len(new_value)] = new_value.encode("utf-8")
else:
if type(symbol.plc_type).__name__ == "PyCArrayType":
write_data = symbol.plc_type(*new_value)
elif type(new_value) is symbol.plc_type:
write_data = new_value
else:
write_data = symbol.plc_type(new_value)
buf[offset: offset + sizeof(symbol.plc_type)] = bytes(write_data)

offset += sizeof(symbol.plc_type)

error_descriptions = adsSumWriteBytes(
self._port,
self._adr,
num_requests,
buf,
)

return error_descriptions

def add_device_notification(
self,
data: Union[str, Tuple[int, int]],
Expand Down
4 changes: 2 additions & 2 deletions pyads/pyads_ex.py
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@
ctypes.POINTER(SAdsNotificationHeader),
ctypes.c_ulong,
)

elif platform_is_freebsd():
# try to load local libTcAdsDll.so in favor to global one
local_adslib = os.path.join(os.path.dirname(__file__), "libTcAdsDll.so")
Expand All @@ -110,7 +110,7 @@
ctypes.POINTER(SAdsNotificationHeader),
ctypes.c_ulong,
)

else: # pragma: no cover, can not test unsupported platform
raise RuntimeError("Unsupported platform {0}.".format(sys.platform))

Expand Down
5 changes: 3 additions & 2 deletions pyads/symbol.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,9 @@

# ads.Connection relies on structs.AdsSymbol (but in type hints only), so use
# this 'if' to only include it when type hinting (False during execution)
if TYPE_CHECKING:
from .ads import Connection, StructureDef # pragma: no cover
if TYPE_CHECKING: # pragma: no cover
from .connection import Connection
from .ads import StructureDef


class AdsSymbol:
Expand Down

0 comments on commit ec4e338

Please sign in to comment.