-
Notifications
You must be signed in to change notification settings - Fork 100
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Added list read/write for symbols #268
Open
RobertoRoos
wants to merge
6
commits into
stlehmann:master
Choose a base branch
from
RobertoRoos:feature/symbol_list
base: master
Could not load branches
Branch not found: {{ refName }}
Loading
Could not load tags
Nothing to show
Loading
Are you sure you want to change the base?
Some commits from the old base branch may be removed from the timeline,
and old review comments may become outdated.
Open
Changes from 3 commits
Commits
Show all changes
6 commits
Select commit
Hold shift + click to select a range
ec4e338
Added read/write by list feature for symbols (this branch was rebased…
RobertoRoos eb887d7
Started with new unit tests for list feature
RobertoRoos 6563a60
Split code into two levels inside Connection and made pyads_ex list f…
RobertoRoos f001ba2
Moved list name retrieval to own method + ran black on three files
RobertoRoos c408653
Undid black reformatting on old code
RobertoRoos 0b0a83c
Added combined _list interface methods for names and symbols
RobertoRoos File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -68,13 +68,17 @@ | |
adsGetNetIdForPLC, | ||
adsGetSymbolInfo, | ||
adsSumRead, | ||
adsSumReadBytes, | ||
adsSumWrite, | ||
adsSumWriteBytes, | ||
adsReleaseHandle, | ||
adsSyncReadByNameEx, | ||
adsSyncWriteByNameEx, | ||
adsSyncAddDeviceNotificationReqEx, | ||
adsSyncDelDeviceNotificationReqEx, | ||
adsSyncSetTimeoutEx, | ||
get_value_from_ctype_data, | ||
type_is_string, | ||
) | ||
from .structs import ( | ||
AmsAddr, | ||
|
@@ -92,6 +96,7 @@ | |
bytes_from_dict, | ||
size_of_structure, | ||
) | ||
from .errorcodes import ERROR_CODES | ||
from .symbol import AdsSymbol | ||
from .utils import decode_ads | ||
|
||
|
@@ -543,6 +548,40 @@ def read_by_name( | |
check_length=check_length, | ||
) | ||
|
||
def _read_list( | ||
self, | ||
data_symbols: Dict[str, Union[SAdsSymbolEntry, Tuple]], | ||
ads_sub_commands: int, | ||
structure_defs: Dict[str, StructureDef], | ||
) -> Dict[str, Any]: | ||
"""Perform read for a list of variables. | ||
|
||
See :meth:`Connection.read_list_by_name`. | ||
""" | ||
|
||
data_names = list(data_symbols.keys()) | ||
|
||
# Limit request side, split into multiple if needed | ||
if len(data_names) <= ads_sub_commands: | ||
data_names_list = [data_names] # Turn into list of a single element | ||
else: | ||
data_names_list = _list_slice_generator(data_names, ads_sub_commands) | ||
|
||
return_data: Dict[str, Any] = {} | ||
|
||
for data_names_slice in data_names_list: | ||
|
||
result = adsSumRead(self._port, self._adr, data_names_slice, data_symbols, | ||
list(structure_defs.keys())) # type: ignore | ||
|
||
for data_name, structure_def in structure_defs.items(): # type: ignore | ||
result[data_name] = dict_from_bytes(result[data_name], | ||
structure_def) # type: ignore | ||
|
||
return_data.update(result) | ||
|
||
return return_data | ||
|
||
def read_list_by_name( | ||
self, | ||
data_names: List[str], | ||
|
@@ -571,36 +610,95 @@ def read_list_by_name( | |
structure_defs = {} | ||
|
||
if cache_symbol_info: | ||
new_items = [i for i in data_names if i not in self._symbol_info_cache] | ||
new_items = [name for name in data_names if name not in self._symbol_info_cache] | ||
new_cache = { | ||
i: adsGetSymbolInfo(self._port, self._adr, i) for i in new_items | ||
name: adsGetSymbolInfo(self._port, self._adr, name) for name in new_items | ||
} | ||
self._symbol_info_cache.update(new_cache) | ||
data_symbols = {i: self._symbol_info_cache[i] for i in data_names} | ||
data_symbols = { | ||
name: self._symbol_info_cache[name] for name in data_names | ||
} | ||
else: | ||
data_symbols = { | ||
i: adsGetSymbolInfo(self._port, self._adr, i) for i in data_names | ||
name: adsGetSymbolInfo(self._port, self._adr, name) for name in data_names | ||
} | ||
|
||
def sum_read(port: int, adr: AmsAddr, data_names: List[str], | ||
data_symbols: Dict) -> Dict[str, str]: | ||
result = adsSumRead(port, adr, data_names, data_symbols, | ||
list(structure_defs.keys())) # type: ignore | ||
return self._read_list(data_symbols, ads_sub_commands, structure_defs) | ||
|
||
for data_name, structure_def in structure_defs.items(): # type: ignore | ||
result[data_name] = dict_from_bytes(result[data_name], | ||
structure_def) # type: ignore | ||
def read_list_of_symbols( | ||
self, | ||
symbols: List[AdsSymbol], | ||
ads_sub_commands: int = MAX_ADS_SUB_COMMANDS, | ||
): | ||
"""Read new values for a list of AdsSymbols using a single ADS call. | ||
|
||
return result | ||
The outputs will be returned as a dictionary, but the cache of each symbol will | ||
be updated too. | ||
|
||
if len(data_names) <= ads_sub_commands: | ||
return sum_read(self._port, self._adr, data_names, data_symbols) | ||
Comparable to :meth:`Connection.read_list_by_name`. | ||
See also :class:`pyads.AdsSymbol`. | ||
|
||
:param symbols: List if symbol instances | ||
:param ads_sub_commands: Max. number of symbols per call (see | ||
`read_list_by_name`) | ||
""" | ||
|
||
for symbol in symbols: | ||
if symbol.is_structure: | ||
raise ValueError("Method not available for structured variables") | ||
|
||
# Relying on `adsSumRead()` is tricky, because we do not have the `dataType` | ||
# (integer) for each symbol, we only have the ctypes-type. | ||
|
||
data_symbols = { | ||
symbol.name: (symbol.index_group, symbol.index_offset, | ||
symbol.plc_type) for symbol in symbols | ||
} | ||
|
||
result = self._read_list(data_symbols, ads_sub_commands, {}) | ||
|
||
# Add result to symbols cache | ||
for symbol in symbols: | ||
symbol._value = result[symbol.name] | ||
|
||
return result | ||
|
||
def _write_list( | ||
self, | ||
data_symbols: Dict[Union[SAdsSymbolEntry, Tuple]], | ||
values: Dict[str, Any], | ||
ads_sub_commands: int, | ||
structure_defs: Dict[str, StructureDef], | ||
) -> Dict[str, str]: | ||
"""Write a list of variables. | ||
|
||
See :meth:`write_list_by_name`. | ||
""" | ||
|
||
if structure_defs is None: | ||
structure_defs = {} | ||
|
||
for name, structure_def in structure_defs.items(): | ||
values[name] = bytes_from_dict(values[name], structure_def) | ||
|
||
structured_data_names = list(structure_defs.keys()) | ||
|
||
if len(values) <= ads_sub_commands: | ||
data_names_list = [values] # Turn into array of single element | ||
# return adsSumWrite( | ||
# self._port, self._adr, data_names_and_values, data_symbols, | ||
# structured_data_names | ||
# ) | ||
else: | ||
data_names_list = _dict_slice_generator(values, ads_sub_commands) | ||
|
||
return_data: Dict[str, str] = {} | ||
for data_names_slice in data_names_list: | ||
|
||
result = adsSumWrite(self._port, self._adr, data_names_slice, data_symbols, | ||
structured_data_names) | ||
return_data.update(result) | ||
|
||
return_data: Dict[str, Any] = {} | ||
for data_names_slice in _list_slice_generator(data_names, ads_sub_commands): | ||
return_data.update( | ||
sum_read(self._port, self._adr, data_names_slice, data_symbols) | ||
) | ||
return return_data | ||
|
||
def write_list_by_name( | ||
|
@@ -631,49 +729,73 @@ def write_list_by_name( | |
|
||
""" | ||
if cache_symbol_info: | ||
new_items = [ | ||
i | ||
for i in data_names_and_values.keys() | ||
if i not in self._symbol_info_cache | ||
] | ||
new_items = [name for name in data_names_and_values.keys() | ||
if name not in self._symbol_info_cache] | ||
new_cache = { | ||
i: adsGetSymbolInfo(self._port, self._adr, i) for i in new_items | ||
name: adsGetSymbolInfo(self._port, self._adr, name) for name in new_items | ||
} | ||
self._symbol_info_cache.update(new_cache) | ||
data_symbols = { | ||
i: self._symbol_info_cache[i] for i in data_names_and_values | ||
} | ||
else: | ||
data_symbols = { | ||
i: adsGetSymbolInfo(self._port, self._adr, i) | ||
for i in data_names_and_values.keys() | ||
name: adsGetSymbolInfo(self._port, self._adr, name) | ||
for name in data_names_and_values.keys() | ||
} | ||
|
||
if structure_defs is None: | ||
structure_defs = {} | ||
else: | ||
data_names_and_values = data_names_and_values.copy() # copy so the original does not get modified | ||
|
||
for name, structure_def in structure_defs.items(): | ||
data_names_and_values[name] = bytes_from_dict(data_names_and_values[name], | ||
structure_def) | ||
return self._write_list(data_symbols, data_names_and_values, ads_sub_commands, structure_defs) | ||
|
||
structured_data_names = list(structure_defs.keys()) | ||
def write_list_of_symbols( | ||
self, | ||
symbols_and_values: Dict[AdsSymbol, Any], | ||
ads_sub_commands: int = MAX_ADS_SUB_COMMANDS, | ||
): | ||
"""Write new values to a list of symbols. | ||
|
||
if len(data_names_and_values) <= ads_sub_commands: | ||
return adsSumWrite( | ||
self._port, self._adr, data_names_and_values, data_symbols, | ||
structured_data_names | ||
) | ||
Either specify a dict of symbols, or first set the `_value` property of | ||
each symbol and then pass them as a list. | ||
Comment on lines
+799
to
+800
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. List of symbols |
||
|
||
return_data: Dict[str, str] = {} | ||
for data_names_slice in _dict_slice_generator(data_names_and_values, | ||
ads_sub_commands): | ||
return_data.update( | ||
adsSumWrite(self._port, self._adr, data_names_slice, data_symbols, | ||
structured_data_names) | ||
) | ||
return return_data | ||
For example: | ||
|
||
.. code:: python | ||
|
||
# Using dict | ||
new_data = {symbol1: 3.14, symbol2: False} | ||
plc.write_list_of_symbols(new_data) | ||
|
||
# Using cache | ||
symbol1._value = 3.14 | ||
symbol2._value = False | ||
plc.write_list_of_symbols([symbol1, symbol2]) | ||
|
||
Comparable to :meth:`Connection.write_list_by_name`. | ||
See also :class:`pyads.AdsSymbol`. | ||
|
||
:param symbols_and_values: Symbols to write to | ||
:param ads_sub_commands: Max. number of symbols per call (see | ||
`write_list_by_name`) | ||
""" | ||
|
||
for symbol in symbols_and_values.keys(): | ||
if symbol.is_structure: | ||
raise ValueError("Method not available for structured variables") | ||
|
||
data_symbols = { | ||
symbol.name: (symbol.index_group, symbol.index_offset, | ||
symbol.plc_type) for symbol in symbols_and_values.keys() | ||
} | ||
|
||
data_names_and_values = {symbol.name: value for symbol, value in | ||
symbols_and_values.items()} | ||
|
||
return self._write_list(data_symbols, data_names_and_values, | ||
ads_sub_commands, {}) | ||
|
||
def read_structure_by_name( | ||
self, | ||
|
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
i
always makes me think of a numeric key, so I wanted to replace with something more verbose.