From f799e0defe14a2acf5c7f2652b3c8f60d83958d1 Mon Sep 17 00:00:00 2001 From: kefeimo Date: Thu, 15 Dec 2022 16:27:18 -0600 Subject: [PATCH] v0.2.3b created interactive station demo (#3) * New feature/improve demo (#1) * clean-up sub-module git * make demo value more intersting * able to build functional wheel using python setup.py bdist_wheel * ready to publish to pypi before clean-up * working setup.py for src structure, able to create tar, so, whl * testpypi enable, note: using stand-alone setup.py * re-anchored deps/dnp3 * changed to forked repo for deps/dnp3 sub-module with ownership * cleaned up for package release * added docs on building wheel * updated notes_on_packaging.md * updated requirements.txt * clean-up, deleted wheel * use local (relative) import for dnp3demo to prevent circular import * cleaned-up dnp3demo import * Resolved master and outstation not shutdown gracefully issue. * resolved master outstation not able to shutdown gracefully * allowed master and outstation to shutdown gracefully * developed cli tool for dnp3dnp3 * New feature/improve outstation (#3) * added auxilary db for outstation, updated demo to display db_handler usage * minor cleaned up outstation * Hot fix/multi outstation (#4) * enabled mutli outstation by introducing pool, TODO: clean-up * minor cleanup * added db to SOEHandler * added send_scan_all_request, added control-workflow master demo * updated dnp3demo main to include interactive master * added communication status check, added get_config printout * scoffolding subcommand args with argparse * added interactive master * finished interactive outstation * cleanup for 0.2.3b, created interactive stations --- README.md | 23 ++ setup.py | 2 +- src/dnp3_python/dnp3station/master_new.py | 204 ++++++----------- src/dnp3_python/dnp3station/outstation_new.py | 210 ++++++++++++------ .../dnp3station/outstation_utils.py | 56 ----- src/dnp3_python/dnp3station/station_utils.py | 187 ++++++++++++---- src/dnp3demo/__main__.py | 60 +++-- src/dnp3demo/control_workflow_demo.py | 2 +- src/dnp3demo/control_workflow_demo_master.py | 111 +++++++++ src/dnp3demo/data_retrieval_demo_master.py | 15 +- .../data_retrieval_demo_outstation.py | 11 +- src/dnp3demo/run_master.py | 173 +++++++++++++++ src/dnp3demo/run_outstation.py | 210 ++++++++++++++++++ 13 files changed, 931 insertions(+), 333 deletions(-) delete mode 100644 src/dnp3_python/dnp3station/outstation_utils.py create mode 100644 src/dnp3demo/control_workflow_demo_master.py create mode 100644 src/dnp3demo/run_master.py create mode 100644 src/dnp3demo/run_outstation.py diff --git a/README.md b/README.md index 0b2837a..c66424d 100644 --- a/README.md +++ b/README.md @@ -44,6 +44,29 @@ ms(1666217819745) INFO server - Accepted connection from: 127.0.0.1 ``` +> **_NOTE:_** Use `python -m dnp3demo -h` to see demo options + +``` +$ python -m dnp3demo -h + +Basic dnp3 use case demo + +optional arguments: + -h, --help show this help message and exit + -d sec, --duration sec + Configure demo duration (in seconds.) + -rm, --run-master-station + Run a standalone master station. + -ro, --run-outstation + Run a standalone master station. + -dg, --demo-get-point + Demo get point workflow. + -ds, --demo-set-point + Demo set point workflow. + + +``` + ## For Developers pydnp3 is a thin wrapper around opendnp3 classes. Documentation for the opendnp3 diff --git a/setup.py b/setup.py index 8345368..3f356a7 100644 --- a/setup.py +++ b/setup.py @@ -47,7 +47,7 @@ from distutils.version import LooseVersion from setuptools import find_packages, find_namespace_packages -__version__ = '0.2.0' +__version__ = '0.2.3b' class CMakeExtension(Extension): diff --git a/src/dnp3_python/dnp3station/master_new.py b/src/dnp3_python/dnp3station/master_new.py index d24463d..0375658 100644 --- a/src/dnp3_python/dnp3station/master_new.py +++ b/src/dnp3_python/dnp3station/master_new.py @@ -36,7 +36,6 @@ class MyMasterNew: """ - Interface for all master application callback info except for measurement values. (TODO: where is and how to get measurement values then?) DNP3 spec section 5.1.6.1: The Application Layer provides the following services for the DNP3 User Layer in a master: @@ -143,8 +142,7 @@ def __init__(self, # ALL_COMMS = 130720 # NORMAL = 15 # INFO, WARN # NOTHING = 0 - # TODO: Rewrite opendnp3.levels note: wild guess, 7: warning, 15 (opendnp3.levels.NORMAL): info - _log.debug('Configuring log level') # TODO: provide more info. Right now this log is not very useful + # _log.debug('Configuring log level') self.channel_log_level: opendnp3.levels = channel_log_level self.master_log_level: opendnp3.levels = master_log_level @@ -153,6 +151,54 @@ def __init__(self, # self.channel.SetLogFilters(openpal.LogFilters(opendnp3.levels.ALL_COMMS)) # self.master.SetLogFilters(openpal.LogFilters(opendnp3.levels.ALL_COMMS)) + # configuration info + self._comm_conifg = { + "masterstation_ip_str": masterstation_ip_str, + "outstation_ip_str": outstation_ip_str, + "port": port, + "masterstation_id_int": masterstation_id_int, + "outstation_id_int": outstation_id_int, + } + + def get_address_id_statics(self): + """Note this is not working: i.e., the value numUnknownDestination is always 0""" + # print("numUnknownDestination", master_application.master.GetStackStatistics().link.numUnknownDestination) + # print("numUnknownSource", master_application.master.GetStackStatistics().link.numUnknownSource) + pass + + @property + def channel_statistic(self): + """statistics of channel connection actions + numOpen: number of times that (successfully) open a connection + numOpenFail: number of fail attempts to open a connection + numClose: number of such once-open-later-close connections + + Note: when there is 1-to-1 mapping from channel to station, then + numOpen - numClose == 1 => SUCCESS + numOpen - numClose == 0 => FAIL + """ + return { + "numOpen": self.channel.GetStatistics().channel.numOpen, + "numOpenFail": self.channel.GetStatistics().channel.numOpenFail, + "numClose": self.channel.GetStatistics().channel.numClose} + + @property + def is_connected(self): + """ + Note: when there is 1-to-1 mapping from channel to station, then + numOpen - numClose == 1 => SUCCESS + numOpen - numClose == 0 => FAIL + """ + if self.channel_statistic.get("numOpen") - self.channel_statistic.get("numClose") == 1: + return True + else: + return False + + def get_config(self): + """print out the configuration + example""" + return self._comm_conifg + def send_direct_operate_command(self, command: Union[opendnp3.ControlRelayOutputBlock, opendnp3.AnalogOutputInt16, @@ -184,7 +230,7 @@ def send_direct_operate_command_set(self, command_set, callback=asiodnp3.Printin self.master.DirectOperate(command_set, callback, config) def send_select_and_operate_command(self, command, index, callback=asiodnp3.PrintingCommandCallback.Get(), - config=opendnp3.TaskConfig().Default()): # TODO: compare to send_direct_operate_command, what's the difference + config=opendnp3.TaskConfig().Default()): """ Select and operate a single command Note: send_direct_operate_command will evoke outstation side def process_point_value TWICE as side effect @@ -197,7 +243,7 @@ def send_select_and_operate_command(self, command, index, callback=asiodnp3.Prin self.master.SelectAndOperate(command, index, callback, config) def send_select_and_operate_command_set(self, command_set, callback=asiodnp3.PrintingCommandCallback.Get(), - config=opendnp3.TaskConfig().Default()): # TODO: compare to send_direct_operate_command_set, what's the difference + config=opendnp3.TaskConfig().Default()): """ Select and operate a set of commands @@ -207,142 +253,6 @@ def send_select_and_operate_command_set(self, command_set, callback=asiodnp3.Pri """ self.master.SelectAndOperate(command_set, callback, config) - # def retrieve_all_obj_by_gvid(self, gv_id: opendnp3.GroupVariationID, - # config=opendnp3.TaskConfig().Default() - # ) -> Dict[opendnp3.GroupVariation, Dict[int, DbPointVal]]: - # """ - # Deprecated. (Use retrieve_val_by_gv instead) - # Retrieve point value (from an outstation databse) based on gvId (Group Variation ID). - # - # Common gvId: ref: dnp3 Namespace Reference: https://docs.stepfunc.io/dnp3/0.9.0/dotnet/namespacednp3.html - # TODO: rewrite opendnp3.GroupVariationID to add docstring - # for static state - # GroupVariationID(30, 6): Analog input - double-precision, floating-point with flag - # GroupVariationID(30, 1): Analog input - 32-bit with flag - # GroupVariationID(1, 2): Binary input - with flags - # - # GroupVariationID(40, 4): Analog Output Status - Double-precision floating point with flags - # GroupVariationID(40, 1): Analog Output Status - 32-bit with flags - # GroupVariationID(10, 2): Binary Output - With flags - # for event - # GroupVariationID(32, 4): Analog Input Event - 16-bit with time - # GroupVariationID(2, 2): Binary Input Event - With absolute time - # GroupVariationID(42, 8): Analog Output Event - Double-preicions floating point with time - # GroupVariationID(11, 2): Binary Output Event - With time - # - # :param opendnp3.GroupVariationID gv_id: group-variance Id - # :param opendnp3.TaskConfig config: Task configuration. Default: opendnp3.TaskConfig().Default() - # - # :return: retrieved point values stored in a nested dict. - # :rtype: Dict[opendnp3.GroupVariation, Dict[int, DbPointVal]] - # - # :example: - # >>> # prerequisite: outstation db properly configured and updated, master_application properly initialized - # >>> master_application.retrieve_all_obj_by_gvid(gv_id=opendnp3.GroupVariationID(30, 6)) - # GroupVariation.Group30Var6: {0: 4.8, 1: 12.1, 2: 24.2, 3: 0.0}} - # """ - # # self.master.ScanRange(gvId=opendnp3.GroupVariationID(30, 1), start=0, stop=3, - # # config=opendnp3.TaskConfig().Default()) - # # self.master.ScanRange(gvId=gvId, start=index_start, stop=index_stop, - # # config=config) - # - # # self.master.ScanAllObjects(gvId=gvid, - # # config=config) - # # gv_cls: opendnp3.GroupVariation = parsing_gvid_to_gvcls(gvid) - # # db_val = {gv_cls: self.soe_handler.gv_index_value_nested_dict.get(gv_cls)} - # - # # TODO: refactor hard-coded retry and sleep, allow config - # # TODO: "prettify" the following while loop workflow. e.g., helper function + recurrent function - # self.master.ScanAllObjects(gvId=gv_id, - # config=config) - # gv_cls: opendnp3.GroupVariation = parsing_gvid_to_gvcls(gv_id) - # gv_db_val = self.soe_handler.gv_index_value_nested_dict.get(gv_cls) - # - # # retry logic to improve performance - # retry_max = self.num_polling_retry - # n_retry = 0 - # sleep_delay = self.delay_polling_retry - # while gv_db_val is None and n_retry < retry_max: - # self.master.ScanAllObjects(gvId=gv_id, - # config=config) - # # gv_cls: opendnp3.GroupVariation = parsing_gvid_to_gvcls(gv_id) - # time.sleep(sleep_delay) - # gv_db_val = self.soe_handler.gv_index_value_nested_dict.get(gv_cls) - # n_retry += 1 - # # print("=======n_retry, gv_db_val, gv_cls", n_retry, gv_db_val, gv_cls) - # # print("=======self.soe_handler", self.soe_handler) - # # print("=======self.soe_handler.gv_index_value_nested_dict id", self.soe_handler.gv_index_value_nested_dict, - # # id(self.soe_handler.gv_index_value_nested_dict)) - # - # if n_retry >= retry_max: - # _log.warning("==Retry numbers hit retry limit {}==".format(retry_max)) - # - # return {gv_cls: gv_db_val} - - # def retrieve_all_obj_by_gvids(self, - # gv_ids: Optional[List[opendnp3.GroupVariationID]] = None, - # config=opendnp3.TaskConfig().Default() - # ) -> Dict[opendnp3.GroupVariation, Dict[int, DbPointVal]]: - # """ - # Deprecated. (encorage the user to use retrieve_val_by_gv instead) - # - # Retrieve point value (from an outstation databse) based on gvId (Group Variation ID). - # - # Common gvId: ref: dnp3 Namespace Reference: https://docs.stepfunc.io/dnp3/0.9.0/dotnet/namespacednp3.html - # TODO: rewrite opendnp3.GroupVariationID to add docstring - # for static state - # GroupVariationID(30, 6): Analog input - double-precision, floating-point with flag - # GroupVariationID(30, 1): Analog input - 32-bit with flag - # GroupVariationID(1, 2): Binary input - with flags - # - # GroupVariationID(40, 4): Analog Output Status - Double-precision floating point with flags - # GroupVariationID(40, 1): Analog Output Status - 32-bit with flags - # GroupVariationID(10, 2): Binary Output - With flags - # for event - # GroupVariationID(32, 4): Analog Input Event - 16-bit with time - # GroupVariationID(2, 2): Binary Input Event - With absolute time - # GroupVariationID(42, 8): Analog Output Event - Double-preicions floating point with time - # GroupVariationID(11, 2): Binary Output Event - With time - # - # :param opendnp3.GroupVariationID gv_ids: list of group-variance Id - # :param opendnp3.TaskConfig config: Task configuration. Default: opendnp3.TaskConfig().Default() - # - # :return: retrieved point values stored in a nested dict. - # :rtype: Dict[opendnp3.GroupVariation, Dict[int, DbPointVal]] - # - # :example: - # >>> # prerequisite: outstation db properly configured and updated, master_application properly initialized - # >>> master_application.retrieve_all_obj_by_gvids() # using default - # GroupVariation.Group30Var6: {0: 4.8, 1: 12.1, 2: 24.2, 3: 0.0}} - # """ - # - # gv_ids: Optional[List[opendnp3.GroupVariationID]] - # if gv_ids is None: # using default - # # GroupVariationID(30, 6): Analog input - double-precision, floating-point with flag - # # GroupVariationID(1, 2): Binary input - with flags - # # GroupVariationID(40, 4): Analog Output Status - Double-precision floating point with flags - # # GroupVariationID(10, 2): Binary Output - With flags - # - # # GroupVariationID(32, 4): Analog Input Event - 16-bit with time - # # GroupVariationID(2, 2): Binary Input Event - With absolute time - # # GroupVariationID(42, 8): Analog Output Event - Double-preicions floating point with time - # # GroupVariationID(11, 2): Binary Output Event - With time - # gv_ids = [GroupVariationID(30, 6), - # GroupVariationID(1, 2), - # GroupVariationID(40, 4), - # GroupVariationID(10, 2), - # # GroupVariationID(32, 4), - # # GroupVariationID(2, 2), - # # GroupVariationID(42, 8), - # # GroupVariationID(11, 2), - # ] - # filtered_db: Dict[opendnp3.GroupVariation, Dict[int, DbPointVal]] = {} - # for gv_id in gv_ids: - # self.retrieve_all_obj_by_gvid(gv_id=gv_id, config=config) - # gv_cls: opendnp3.GroupVariation = parsing_gvid_to_gvcls(gv_id) - # filtered_db.update({gv_cls: self.soe_handler.gv_index_value_nested_dict.get(gv_cls)}) - # return filtered_db - def _retrieve_all_obj_by_gvids_w_ts(self, gv_ids: Optional[List[opendnp3.GroupVariationID]] = None, config=opendnp3.TaskConfig().Default() @@ -628,3 +538,15 @@ def __del__(self): self.shutdown() except AttributeError: pass + + def send_scan_all_request(self, gv_ids: List[opendnp3.GroupVariationID] = None): + """send requests to retrieve all point values, if gv_ids not provided then use default """ + config = opendnp3.TaskConfig().Default() + if gv_ids is None: + gv_ids = [GroupVariationID(group=30, variation=6), + GroupVariationID(group=40, variation=4), + GroupVariationID(group=1, variation=2), + GroupVariationID(group=10, variation=2)] + for gv_id in gv_ids: + self.master.ScanAllObjects(gvId=gv_id, + config=config) diff --git a/src/dnp3_python/dnp3station/outstation_new.py b/src/dnp3_python/dnp3station/outstation_new.py index f7873ab..7b1aa56 100644 --- a/src/dnp3_python/dnp3station/outstation_new.py +++ b/src/dnp3_python/dnp3station/outstation_new.py @@ -1,14 +1,18 @@ +from __future__ import annotations + import logging import sys +import pydnp3.asiopal from pydnp3 import opendnp3, openpal, asiopal, asiodnp3 import time -from typing import Union, Type +from typing import Union, Type, Dict from .station_utils import master_to_outstation_command_parser from .station_utils import OutstationCmdType, MasterCmdType # from .outstation_utils import MeasurementType +from .station_utils import DBHandler LOG_LEVELS = opendnp3.levels.NORMAL | opendnp3.levels.ALL_COMMS LOCAL_IP = "0.0.0.0" @@ -21,7 +25,7 @@ _log = logging.getLogger(__name__) _log.addHandler(stdout_stream) _log.setLevel(logging.DEBUG) -# _log.setLevel(logging.ERROR) # TODO: encapsulate this +# _log.setLevel(logging.ERROR) _log.setLevel(logging.INFO) # alias @@ -54,7 +58,11 @@ class MyOutStationNew(opendnp3.IOutstationApplication): into outgoing messages. """ - outstation = None + # outstation = None + # db_handler = None + outstation_application = None + # outstation_pool = {} # a pool of outstations + outstation_application_pool: Dict[str, MyOutStationNew] = {} # a pool of outstation applications def __init__(self, masterstation_ip_str: str = "0.0.0.0", @@ -85,19 +93,7 @@ def __init__(self, # Note: dbconfig signature at cpp/libs/include/asiodnp3/DatabaseConfig.h # which has sizes parameter self.configure_database(self.stack_config.dbConfig) # TODO: refactor it to outside of the class. - # print("=====verify self.stack_config.dbConfig.analog[0].svariation", - # self.stack_config.dbConfig.analog[0].svariation) - # self.stack_config.dbConfig.analog[0].svariation = opendnp3.StaticAnalogVariation.Group30Var5 - # print("=====verify again self.stack_config.dbConfig.analog[0].svariation", - # self.stack_config.dbConfig.analog[0].svariation) - # - # print("====self.stack_config.dbConfig.analog[0].svariation type", type(self.stack_config.dbConfig.analog[0].svariation )) - # - # # print("=======experiment", self.stack_config.dbConfig.binary.toView) - # print("=======after self.stack_config", self.stack_config.dbConfig) - # print("=======experiment", self.stack_config.dbConfig.sizes.numAnalog) - # threads_to_allocate = 1 # self.log_handler = MyLogger() self.log_handler = asiodnp3.ConsoleLogger().Create() # (or use this during regression testing) # self.manager = asiodnp3.DNP3Manager(threads_to_allocate, self.log_handler) @@ -122,16 +118,23 @@ def __init__(self, listener=self.listener) _log.debug('Adding the outstation to the channel.') - self.command_handler = OutstationCommandHandler() + self.outstation_id = outstation_ip_str + "-" + str(port) + # self.command_handler = OutstationCommandHandler() + self.command_handler = MyOutstationCommandHandler() + # Note: use post init to link outstation application and OutstationCommandHandler instance(object) + self.command_handler.post_init(outstation_id=self.outstation_id) # self.command_handler = opendnp3.SuccessCommandHandler().Create() # (or use this during regression testing) - # self.outstation_application = OutstationApplication() - self.outstation_application = self - self.outstation = self.channel.AddOutstation(id="outstation", + # init outstation applicatioin, # Note: singleton for AddOutstation() + MyOutStationNew.set_outstation_application(outstation_application=self) + + # finally, init outstation + self.outstation = self.channel.AddOutstation(id="outstation-" + self.outstation_id, commandHandler=self.command_handler, - # application=self, - application=self.outstation_application, + application=MyOutStationNew.outstation_application, config=self.stack_config) + MyOutStationNew.add_outstation_app(outstation_id=self.outstation_id, outstation_app=self.outstation_application) + # Configure log level for channel(tcpclient) and outstation # note: one of the following # ALL = -1 @@ -139,7 +142,7 @@ def __init__(self, # ALL_COMMS = 130720 # NORMAL = 15 # NOTHING = 0 - # TODO: add def set_channel_log_level, def set_master_log_level + _log.debug('Configuring log level') self.channel_log_level: opendnp3.levels = channel_log_level self.outstation_log_level: opendnp3.levels = outstation_log_level @@ -150,10 +153,76 @@ def __init__(self, # self.master.SetLogFilters(openpal.LogFilters(opendnp3.levels.ALL_COMMS)) # Put the Outstation singleton in OutstationApplication so that it can be used to send updates to the Master. - MyOutStationNew.set_outstation(self.outstation) # TODO: change MyOutStationNew to cls + # MyOutStationNew.set_outstation(self.outstation) # Note: this needs to be self.outstation (not cls.outstation) + # + self.db_handler = DBHandler(stack_config=self.stack_config) + # MyOutStationNew.set_db_handler(self.db_handler) + + # configuration info + self._comm_conifg = { + "masterstation_ip_str": masterstation_ip_str, + "outstation_ip_str": outstation_ip_str, + "port": port, + "masterstation_id_int": masterstation_id_int, + "outstation_id_int": outstation_id_int, + } + + @property + def channel_statistic(self): + """statistics of channel connection actions + numOpen: number of times that (successfully) open a connection + numOpenFail: number of fail attempts to open a connection + numClose: number of such once-open-later-close connections + + Note: when there is 1-to-1 mapping from channel to station, then + numOpen - numClose == 1 => SUCCESS + numOpen - numClose == 0 => FAIL + """ + return { + "numOpen": self.channel.GetStatistics().channel.numOpen, + "numOpenFail": self.channel.GetStatistics().channel.numOpenFail, + "numClose": self.channel.GetStatistics().channel.numClose} + + @property + def is_connected(self): + """ + Note: when there is 1-to-1 mapping from channel to station, then + numOpen - numClose == 1 => SUCCESS + numOpen - numClose == 0 => FAIL + """ + if self.channel_statistic.get("numOpen") - self.channel_statistic.get("numClose") == 1: + return True + else: + return False + def get_config(self): + """print out the configuration + example""" + return self._comm_conifg + + @classmethod + def add_outstation_app(cls, outstation_id: str, outstation_app: MyOutStationNew): + """add outstation instance to outstation pool, + the id is in the format of `ip-port`, e.g., `0.0.0.0-20000`.""" + cls.outstation_application_pool[outstation_id] = outstation_app + + @classmethod + def get_outstation_app(cls, outstation_id: str) -> MyOutStationNew: + """get outstation instance from the outstation pool using outstation id, + the id is in the format of `ip-port`, e.g., `0.0.0.0-20000`.""" + return cls.outstation_application_pool.get(outstation_id) + + @classmethod + def set_outstation_application(cls, outstation_application): + """ + use singleton + Note: at this version,needs to keep this function + """ + if cls.outstation_application: + pass + else: + cls.outstation_application = outstation_application - # @staticmethod def configure_stack(self): """Set up the OpenDNP3 configuration.""" stack_config = asiodnp3.OutstationStackConfig(opendnp3.DatabaseSizes.AllTypes(10)) @@ -237,30 +306,13 @@ def shutdown(self): Process finished with exit code 134 (interrupted by signal 6: SIGABRT) """ time.sleep(2) # Note: hard-coded sleep to avoid hanging process - _outstation = self.get_outstation() + # _outstation = self.get_outstation() + _outstation = self.outstation _outstation.Shutdown() # del _outstation self.channel.Shutdown() - # self.manager.Shutdown() - - @classmethod # TODO: Justify the necessity to use class method - def get_outstation(cls): - """Get the singleton instance of IOutstation.""" - return cls.outstation - - @classmethod - def set_outstation(cls, outstn): - """ - Set the singleton instance of IOutstation, as returned from the channel's AddOutstation call. - - Making IOutstation available as a singleton allows other classes (e.g. the command-line UI) - to send commands to it -- see apply_update(). - """ - cls.outstation = outstn - - @classmethod - def process_point_value(cls, command_type, command, index, op_type): + def process_point_value(self, command_type, command, index, op_type): """ A PointValue was received from the Master. Process its payload then up database (For control workflow) Note: parse master operation command to outstation update command, then reuse apply_update. @@ -281,10 +333,11 @@ def process_point_value(cls, command_type, command, index, op_type): # Note: print("command value ", command.value) for AnalogOutput/AnalogOutputDouble64, etc. outstation_cmd = master_to_outstation_command_parser(command) # then reuse apply_update - cls.apply_update(outstation_cmd, index) + # cls.apply_update(outstation_cmd, index) + self.apply_update(outstation_cmd, index) - @classmethod - def apply_update(cls, + # @classmethod + def apply_update(self, measurement: OutstationCmdType, index): """ @@ -303,7 +356,11 @@ def apply_update(cls, # builder.Update(measurement, index) # update = builder.Build() update = asiodnp3.UpdateBuilder().Update(measurement, index).Build() - cls.get_outstation().Apply(update) + # cls.get_outstation().Apply(update) + self.outstation.Apply(update) + + # cls.db_handler.process(measurement, index) + self.db_handler.process(measurement, index) def __del__(self): try: @@ -312,15 +369,25 @@ def __del__(self): pass -class OutstationCommandHandler(opendnp3.ICommandHandler): +class MyOutstationCommandHandler(opendnp3.ICommandHandler): """ Override ICommandHandler in this manner to implement application-specific command handling. ICommandHandler implements the Outstation's handling of Select and Operate, which relay commands and data from the Master to the Outstation. + + Note: this class CANNOT implement init """ - outstation_application = MyOutStationNew + # outstation_application = MyOutStationNew + outstation_id = "" + + # def __init__(self, outstation_id="some-id"): + # self.outstation_id = outstation_id + + def post_init(self, outstation_id, **kwargs): + """helper function to pass values, e.g., outstation_id""" + self.outstation_id = outstation_id def Start(self): _log.debug('In OutstationCommandHandler.Start') @@ -338,10 +405,15 @@ def Select(self, command, index): :param index: int :return: CommandStatus """ - # print("===========command, ", command) - # print("===========index, ", index) - self.outstation_application.process_point_value('Select', command, index, None) - return opendnp3.CommandStatus.SUCCESS + outstation_application_pool = MyOutStationNew.outstation_application_pool + outstation_app = outstation_application_pool.get(self.outstation_id) + + try: + outstation_app.process_point_value('Select', command, index, None) + return opendnp3.CommandStatus.SUCCESS + except Exception as e: + _log.error(e) + raise e def Operate(self, command, index, op_type): """ @@ -355,11 +427,16 @@ def Operate(self, command, index, op_type): :param op_type: OperateType :return: CommandStatus """ - # print("Operate===========command, ", command) - # print("Operate===========index, ", index) - # print("Operate===========op_type, ", op_type) - self.outstation_application.process_point_value('Operate', command, index, op_type) - return opendnp3.CommandStatus.SUCCESS + + outstation_application_pool = MyOutStationNew.outstation_application_pool + outstation_app = outstation_application_pool.get(self.outstation_id) + try: + # self.outstation_application.process_point_value('Operate', command, index, op_type) + outstation_app.process_point_value('Operate', command, index, op_type) + return opendnp3.CommandStatus.SUCCESS + except Exception as e: + _log.error(e) + raise e class AppChannelListener(asiodnp3.IChannelListener): @@ -372,18 +449,3 @@ def __init__(self): def OnStateChange(self, state): _log.debug('In AppChannelListener.OnStateChange: state={}'.format(state)) - - -class MyLogger(openpal.ILogHandler): - """ - Override ILogHandler in this manner to implement application-specific logging behavior. - """ - - def __init__(self): - super(MyLogger, self).__init__() - - def Log(self, entry): - filters = entry.filters.GetBitfield() - location = entry.location.rsplit('/')[-1] if entry.location else '' - message = entry.message - _log.debug('Log\tfilters={}\tlocation={}\tentry={}'.format(filters, location, message)) diff --git a/src/dnp3_python/dnp3station/outstation_utils.py b/src/dnp3_python/dnp3station/outstation_utils.py deleted file mode 100644 index 0952ff6..0000000 --- a/src/dnp3_python/dnp3station/outstation_utils.py +++ /dev/null @@ -1,56 +0,0 @@ -import datetime -import logging -import sys -import time - -from pydnp3 import opendnp3, openpal, asiopal, asiodnp3 -from .visitors import * -from pydnp3.opendnp3 import GroupVariation, GroupVariationID - -from typing import Callable, Union, Dict, Tuple, List, Optional, Type, TypeVar - -from .station_utils import MasterCmdType - -stdout_stream = logging.StreamHandler(sys.stdout) -stdout_stream.setFormatter(logging.Formatter('%(asctime)s\t%(name)s\t%(levelname)s\t%(message)s')) - -_log = logging.getLogger(__name__) -_log.addHandler(stdout_stream) -_log.setLevel(logging.DEBUG) -# _log.setLevel(logging.INFO) - -# alias -OutstationCmdType = Union[opendnp3.Analog, opendnp3.Binary, opendnp3.AnalogOutputStatus, - opendnp3.BinaryOutputStatus] # based on asiodnp3.UpdateBuilder.Update(**args) -# MasterCmdType = Union[opendnp3.AnalogOutputDouble64, -# opendnp3.AnalogOutputFloat32, -# opendnp3.AnalogOutputInt32, -# opendnp3.AnalogOutputInt16, -# opendnp3.ControlRelayOutputBlock] -MeasurementType = TypeVar("MeasurementType", bound=opendnp3.Measurement) # inheritance, e.g., opendnp3.Analog, - -# TODO: combine outstation util with master_utils - -def master_to_outstation_command_parser(master_cmd: MasterCmdType) -> OutstationCmdType: - """ - Used to parse send command to update command, e.g., opendnp3.AnalogOutputDouble64 -> AnalogOutputStatus - """ - # return None - if type(master_cmd) in [opendnp3.AnalogOutputDouble64, - opendnp3.AnalogOutputFloat32, - opendnp3.AnalogOutputInt32, - opendnp3.AnalogOutputInt16]: - return opendnp3.AnalogOutputStatus(value=master_cmd.value) - elif type(master_cmd) is opendnp3.ControlRelayOutputBlock: - # Note: ControlRelayOutputBlock requires to use hard-coded rawCode to retrieve value at this version. - bi_value: bool - if master_cmd.rawCode == 3: - bi_value = True - elif master_cmd.rawCode == 4: - bi_value = False - else: - raise ValueError( - f"master_cmd.rawCode {master_cmd.rawCode} is not a valid rawCode. (3: On/True, 4:Off/False.") - return opendnp3.BinaryOutputStatus(value=bi_value) - else: - raise ValueError(f"master_cmd {master_cmd} with type {type(master_cmd)} is not a valid command.") diff --git a/src/dnp3_python/dnp3station/station_utils.py b/src/dnp3_python/dnp3station/station_utils.py index 11c37cb..9d36532 100644 --- a/src/dnp3_python/dnp3station/station_utils.py +++ b/src/dnp3_python/dnp3station/station_utils.py @@ -38,24 +38,30 @@ opendnp3.AnalogOutputInt16, opendnp3.ControlRelayOutputBlock] +OutstationCmdType = Union[opendnp3.Analog, + opendnp3.AnalogOutputStatus, + opendnp3.Binary, + opendnp3.BinaryOutputStatus] + +MeasurementType = TypeVar("MeasurementType", + bound=opendnp3.Measurement) # inheritance, e.g., opendnp3.Analog, # TODO: add validating connection logic # TODO: add validating configuration logic # (e.g., check if db at outstation side is configured correctly, i.e., OutstationStackConfig) -class MyLogger(openpal.ILogHandler): - """ - Override ILogHandler in this manner to implement application-specific logging behavior. - """ - def __init__(self): - super(MyLogger, self).__init__() - def Log(self, entry): - flag = opendnp3.LogFlagToString(entry.filters.GetBitfield()) - filters = entry.filters.GetBitfield() - location = entry.location.rsplit('/')[-1] if entry.location else '' - message = entry.message - _log.debug('LOG\t\t{:<10}\tfilters={:<5}\tlocation={:<25}\tentry={}'.format(flag, filters, location, message)) +class HandlerLogger: + def __int__(self, logger_name="HandlerLogger"): + self._log = logging.getLogger(logger_name) + self.config_logger() + + def get_logger(self): + return self._log + + def config_logger(self, log_level=logging.INFO): + self._log.addHandler(stdout_stream) + self._log.setLevel(log_level) class AppChannelListener(asiodnp3.IChannelListener): @@ -77,15 +83,25 @@ class SOEHandler(opendnp3.ISOEHandler): This is an interface for SequenceOfEvents (SOE) callbacks from the Master stack to the application layer. """ - # TODO: refactor to its own module def __init__(self, soehandler_log_level=logging.INFO, *args, **kwargs): super(SOEHandler, self).__init__() + + # auxiliary database self._gv_index_value_nested_dict: Dict[GroupVariation, Optional[Dict[int, DbPointVal]]] = {} self._gv_ts_ind_val_dict: Dict[GroupVariation, Tuple[datetime.datetime, Optional[Dict[int, DbPointVal]]]] = {} - _log.setLevel(soehandler_log_level) # TODO: refactor to its own module (right now thi si global) - self._gv_last_poll_dict: Dict[GroupVariation, Optional[datetime.datetime]] = {} + # logging + self.logger = logging.getLogger(self.__class__.__name__) + self.config_logger(log_level=soehandler_log_level) + + # db + self._db = self.init_db() + + def config_logger(self, log_level=logging.INFO): + self.logger.addHandler(stdout_stream) + self.logger.setLevel(log_level) + def Process(self, info, values: ICollectionIndexedVal, *args, **kwargs): @@ -143,12 +159,15 @@ def Process(self, info, # visitor.index_and_value: List[Tuple[int, DbPointVal]] for index, value in visitor.index_and_value: log_string = 'SOEHandler.Process {0}\theaderIndex={1}\tdata_type={2}\tindex={3}\tvalue={4}' - _log.debug(log_string.format(info.gv, info.headerIndex, type(values).__name__, index, value)) + self.logger.debug(log_string.format(info.gv, info.headerIndex, type(values).__name__, index, value)) # print(log_string.format(info.gv, info.headerIndex, type(values).__name__, index, value)) info_gv: GroupVariation = info.gv visitor_ind_val: List[Tuple[int, DbPointVal]] = visitor.index_and_value + # _log.info("======== SOEHandler.Process") + # _log.info(f"info_gv {info_gv}") + # _log.info(f"visitor_ind_val {visitor_ind_val}") self._post_process(info_gv=info_gv, visitor_ind_val=visitor_ind_val) def _post_process(self, info_gv: GroupVariation, visitor_ind_val: List[Tuple[int, DbPointVal]]): @@ -173,10 +192,10 @@ def _post_process(self, info_gv: GroupVariation, visitor_ind_val: List[Tuple[int self._gv_last_poll_dict[info_gv] = datetime.datetime.now() def Start(self): - _log.debug('In SOEHandler.Start====') + self.logger.debug('In SOEHandler.Start====') def End(self): - _log.debug('In SOEHandler.End') + self.logger.debug('In SOEHandler.End') @property def gv_index_value_nested_dict(self) -> Dict[GroupVariation, Optional[Dict[int, DbPointVal]]]: @@ -190,6 +209,48 @@ def gv_ts_ind_val_dict(self): def gv_last_poll_dict(self) -> Dict[GroupVariation, Optional[datetime.datetime]]: return self._gv_last_poll_dict + @property + def db(self) -> dict: + """micmic DbHandler.db""" + self._consolidate_db() + return self._db + + @staticmethod + def init_db(size=10): + db = {} + for number, gv_name in zip([size, + size, + size, + size], + ["Analog", "AnalogOutputStatus", + "Binary", "BinaryOutputStatus"]): + val_body = dict((n, None) for n in range(number)) + db[gv_name] = val_body + + return db + + def _consolidate_db(self): + """map group variance to db with 4 keys: + "Binary", "BinaryOutputStatus", "Analog", "AnalogOutputStatus" + """ + pass + # for Analog + _db = {"Analog": self._gv_index_value_nested_dict.get(GroupVariation.Group30Var6)} + if _db.get("Analog"): + self._db.update(_db) + # for AnalogOutputStatus + _db = {"AnalogOutputStatus": self._gv_index_value_nested_dict.get(GroupVariation.Group40Var4)} + if _db.get("AnalogOutputStatus"): + self._db.update(_db) + # for Binary + _db = {"Binary": self._gv_index_value_nested_dict.get(GroupVariation.Group1Var2)} + if _db.get("Binary"): + self._db.update(_db) + # for Binary + _db = {"BinaryOutputStatus": self._gv_index_value_nested_dict.get(GroupVariation.Group10Var2)} + if _db.get("BinaryOutputStatus"): + self._db.update(_db) + def collection_callback(result=None): """ @@ -243,26 +304,6 @@ def parsing_gvid_to_gvcls(gvid: GroupVariationID) -> GroupVariation: assert gv_cls is not None except ValueError as e: _log.warning(f"Group{group}Var{variation} is not valid opendnp3.GroupVariation") - # if group == 30 and variation == 6: - # gv_cls = GroupVariation.Group30Var6 - # elif group == 30 and variation == 1: - # gv_cls = GroupVariation.Group30Var1 - # elif group == 1 and variation == 2: - # gv_cls = GroupVariation.Group1Var2 - # elif group == 40 and variation == 4: - # gv_cls = GroupVariation.Group40Var4 - # elif group == 4 and variation == 1: - # gv_cls = GroupVariation.Group40Var1 - # elif group == 10 and variation == 2: - # gv_cls = GroupVariation.Group10Var2 - # elif group == 32 and variation == 4: - # gv_cls = GroupVariation.Group32Var4 - # elif group == 2 and variation == 2: - # gv_cls = GroupVariation.Group2Var2 - # elif group == 42 and variation == 8: - # gv_cls = GroupVariation.Group42Var8 - # elif group == 11 and variation == 2: - # gv_cls = GroupVariation.Group11Var2 return gv_cls @@ -305,16 +346,13 @@ def parsing_gv_to_mastercmdtype(group: int, variation: int, val_to_set: DbPointV # alias -OutstationCmdType = Union[opendnp3.Analog, opendnp3.Binary, opendnp3.AnalogOutputStatus, - opendnp3.BinaryOutputStatus] # based on asiodnp3.UpdateBuilder.Update(**args) +# OutstationCmdType = Union[opendnp3.Analog, opendnp3.Binary, opendnp3.AnalogOutputStatus, +# opendnp3.BinaryOutputStatus] # based on asiodnp3.UpdateBuilder.Update(**args) # MasterCmdType = Union[opendnp3.AnalogOutputDouble64, # opendnp3.AnalogOutputFloat32, # opendnp3.AnalogOutputInt32, # opendnp3.AnalogOutputInt16, # opendnp3.ControlRelayOutputBlock] -MeasurementType = TypeVar("MeasurementType", bound=opendnp3.Measurement) # inheritance, e.g., opendnp3.Analog, - -# TODO: combine outstation util with master_utils def master_to_outstation_command_parser(master_cmd: MasterCmdType) -> OutstationCmdType: @@ -340,3 +378,66 @@ def master_to_outstation_command_parser(master_cmd: MasterCmdType) -> Outstation return opendnp3.BinaryOutputStatus(value=bi_value) else: raise ValueError(f"master_cmd {master_cmd} with type {type(master_cmd)} is not a valid command.") + + +class DBHandler: + """ + Work as an auxiliary database for outstation (Mimic SOEHAndler for master-station) + """ + + def __init__(self, stack_config=asiodnp3.OutstationStackConfig(opendnp3.DatabaseSizes.AllTypes(10)), + dbhandler_log_level=logging.INFO, *args, **kwargs): + + self.stack_config = stack_config + self._db: dict = self.config_db(stack_config) + + self.logger = logging.getLogger(self.__class__.__name__) + self.config_logger(log_level=dbhandler_log_level) + + def config_logger(self, log_level=logging.INFO): + self.logger.addHandler(stdout_stream) + self.logger.setLevel(log_level) + + @staticmethod + def config_db(stack_config): + db = {} + for number, gv_name in zip([stack_config.dbConfig.sizes.numBinary, + stack_config.dbConfig.sizes.numBinaryOutputStatus, + stack_config.dbConfig.sizes.numAnalog, + stack_config.dbConfig.sizes.numAnalogOutputStatus], + ["Analog", "AnalogOutputStatus", + "Binary", "BinaryOutputStatus"]): + val_body = dict((n, None) for n in range(number)) + db[gv_name] = val_body + + return db + + @property + def db(self) -> dict: + return self._db + + def process(self, command, index): + pass + # _log.info(f"command {command}") + # _log.info(f"index {index}") + update_body: dict = {index: command.value} + if self.db.get(command.__class__.__name__): + self.db[command.__class__.__name__].update(update_body) + else: + self.db[command.__class__.__name__] = update_body + # _log.info(f"========= self.db {self.db}") + + +class MyLogger(openpal.ILogHandler): + """ + Override ILogHandler in this manner to implement application-specific logging behavior. + """ + + def __init__(self): + super(MyLogger, self).__init__() + + def Log(self, entry): + filters = entry.filters.GetBitfield() + location = entry.location.rsplit('/')[-1] if entry.location else '' + message = entry.message + _log.debug('Log\tfilters={}\tlocation={}\tentry={}'.format(filters, location, message)) \ No newline at end of file diff --git a/src/dnp3demo/__main__.py b/src/dnp3demo/__main__.py index 383fae3..012aa59 100644 --- a/src/dnp3demo/__main__.py +++ b/src/dnp3demo/__main__.py @@ -1,21 +1,55 @@ -from dnp3demo import data_retrieval_demo +from dnp3demo import data_retrieval_demo, control_workflow_demo +from dnp3demo import run_master, run_outstation +import argparse def main(): - """Read the Real Python article feed""" + # Initialize parser + parser = argparse.ArgumentParser( + prog="dnp3demo", + description="Basic dnp3 use case demo", + # epilog="Thanks for using %(prog)s! :)", + ) - # If an article ID is given, then show the article - # if len(sys.argv) > 1: - # article = feed.get_article(sys.argv[1]) - # viewer.show(article) - # - # # If no ID is given, then show a list of all articles - # else: - # site = feed.get_site() - # titles = feed.get_titles() - # viewer.show_list(site, titles) - data_retrieval_demo.main() + # subcommand + # Note: by using `dest="command"`, we create namespace, such that args.command + # to access subcommand + subparsers = parser.add_subparsers(title="dnp3demo Sub-command", + # help='run-station sub-command help', + dest="command") + parser_master = subparsers.add_parser('master', help='run an interactive master') + # parser_master = subparsers + parser_master = run_master.setup_args(parser_master) + + parser_outstation = subparsers.add_parser('outstation', help='run an interactive outstation') + parser_outstation = run_outstation.setup_args(parser_outstation) + + # demo-subcommand (default) + parser_demo = subparsers.add_parser('demo', help='run dnp3 demo with default master and outstation', ) + subparser_group = parser_demo.add_mutually_exclusive_group(required=True) + subparser_group.add_argument("-dg", "--demo-get-point", action="store_true", + help="Demo get point workflow. (default)") + subparser_group.add_argument("-ds", "--demo-set-point", action="store_true", + help="Demo set point workflow.") + # read args + args = parser.parse_args() + + cmd = args.command + if cmd == "master": + run_master.main(parser=parser) + elif cmd == "outstation": + run_outstation.main(parser=parser) + elif cmd == "demo": + if args.demo_set_point: + control_workflow_demo.main() + else: + data_retrieval_demo.main() + elif cmd is None: # default behavior + data_retrieval_demo.main() if __name__ == "__main__": main() + print("============") + print("End of Demo.") + print("============") diff --git a/src/dnp3demo/control_workflow_demo.py b/src/dnp3demo/control_workflow_demo.py index f26e559..f0de202 100644 --- a/src/dnp3demo/control_workflow_demo.py +++ b/src/dnp3demo/control_workflow_demo.py @@ -134,7 +134,7 @@ def main(): # result = master_application.get_db_by_group_variation(group=30, variation=6) # print(f"===important log: case6b get_db_by_group_variation ==== {count}", datetime.datetime.now(), # result) - + # print("fffffffffffffffffffff", outstation_application.db_handler.db) _log.debug('Exiting.') master_application.shutdown() outstation_application.shutdown() diff --git a/src/dnp3demo/control_workflow_demo_master.py b/src/dnp3demo/control_workflow_demo_master.py new file mode 100644 index 0000000..39a2f1e --- /dev/null +++ b/src/dnp3demo/control_workflow_demo_master.py @@ -0,0 +1,111 @@ +import logging +import random +import sys + +from pydnp3 import opendnp3 +from dnp3_python.dnp3station.station_utils import command_callback +from dnp3_python.dnp3station.master_new import MyMasterNew +from dnp3_python.dnp3station.outstation_new import MyOutStationNew + +from time import sleep +import datetime +import json + +stdout_stream = logging.StreamHandler(sys.stdout) +stdout_stream.setFormatter(logging.Formatter('%(asctime)s\t%(name)s\t%(levelname)s\t%(message)s')) + +_log = logging.getLogger(__name__) +_log = logging.getLogger("control_workflow_demo") +_log.addHandler(stdout_stream) +_log.setLevel(logging.DEBUG) + + +def main(): + # cmd_interface_master = MasterCmd() + master_application = MyMasterNew( + port=20001, + outstation_id_int=2, + + # channel_log_level=opendnp3.levels.ALL_COMMS, + # master_log_level=opendnp3.levels.ALL_COMMS + # soe_handler=SOEHandler(soehandler_log_level=logging.DEBUG) + ) + master_application.start() + _log.debug('Initialization complete. Master Station in command loop.') + # cmd_interface_outstation = OutstationCmd() + # outstation_application = MyOutStationNew( + # # channel_log_level=opendnp3.levels.ALL_COMMS, + # # outstation_log_level=opendnp3.levels.ALL_COMMS + # ) + # outstation_application.start() + # _log.debug('Initialization complete. OutStation in command loop.') + + sleep(2) + print("============") + # info = master_application.stack_config.__dir__() + # info = master_application.stack_config.master.__dir__() + # info = master_application.stack_config.link.__dir__() + # info = master_application.listener.__dir__() + # info = master_application.channel.__dir__() + # info = master_application.channel.GetStatistics().channel.__dir__() + # + # print(info) + # print(master_application.channel.GetStatistics().channel.numOpen) + # print(master_application.channel.GetStatistics().channel.numOpenFail) + # print(master_application.channel.GetStatistics().channel.numClose) + + + # Note: if without sleep(2) there will be a glitch when first send_select_and_operate_command + # (i.e., all the values are zero, [(0, 0.0), (1, 0.0), (2, 0.0), (3, 0.0)])) + # since it would not update immediately + + # cmd_interface.startup() + count = 0 + while count < 1000: + # sleep(1) # Note: hard-coded, master station query every 1 sec. + + count += 1 + + print() + print("=================================================================") + print("Set AnalogOutput point") + print("Type in and . Separate with space, then hit ENTER.") + print("=================================================================") + print() + input_str = input() + try: + p_val = float(input_str.split(" ")[0]) + index = int(input_str.split(" ")[1]) + + master_application.send_direct_point_command(group=40, variation=4, index=index, val_to_set=p_val) + # master_application.get_db_by_group_variation(group=30, variation=6) + # master_application.get_db_by_group_variation(group=40, variation=4) + # master_application.send_scan_all_request() + # sleep(3) + + except Exception as e: + print(f"your input string '{input_str}'") + print(e) + + master_application.send_scan_all_request() + sleep(3) + + # db_print = json.dumps(master_application.soe_handler.db, indent=4, sort_keys=True) + db_print = master_application.soe_handler.db + # print(f"====== master database: {master_application.soe_handler.gv_index_value_nested_dict}") + print(f"====== master database: {db_print}") + # print("===== numOpen", master_application.channel.GetStatistics().channel.numOpen) + # print("===== numOpenFail", master_application.channel.GetStatistics().channel.numOpenFail) + # print("===== numClose", master_application.channel.GetStatistics().channel.numClose) + + print(master_application.get_config()) + print(master_application.is_connected) + print(master_application.channel_statistic) + + _log.debug('Exiting.') + master_application.shutdown() + # outstation_application.shutdown() + + +if __name__ == '__main__': + main() diff --git a/src/dnp3demo/data_retrieval_demo_master.py b/src/dnp3demo/data_retrieval_demo_master.py index 34e6a78..c4e735f 100644 --- a/src/dnp3demo/data_retrieval_demo_master.py +++ b/src/dnp3demo/data_retrieval_demo_master.py @@ -8,6 +8,8 @@ import datetime from time import sleep +import time + stdout_stream = logging.StreamHandler(sys.stdout) stdout_stream.setFormatter(logging.Formatter('%(asctime)s\t%(name)s\t%(levelname)s\t%(message)s')) @@ -19,15 +21,20 @@ # logging.basicConfig(filename='demo.log', level=logging.DEBUG) -def main(): + +def main(duration=300): master_application = MyMasterNew() master_application.start() _log.debug('Initialization complete. Master Station in command loop.') # outstation_application = MyOutStationNew() # _log.debug('Initialization complete. OutStation in command loop.') + start = time.time() + end = time.time() + count = 0 - while count < 20: + while count < 1000 and (end - start) < duration: + end = time.time() sleep(3) # Note: hard-coded, master station query every 1 sec. count += 1 @@ -119,6 +126,10 @@ def main(): print(f"===important log: case7c get_db_by_group_variation_index ==== {count}", datetime.datetime.now(), result) + # print(f"====== master database: {master_application.soe_handler.gv_ts_ind_val_dict}") + print(f"====== master database: {master_application.soe_handler.gv_index_value_nested_dict}") + print(f"====== master database: {master_application.soe_handler.db}") + _log.debug('Exiting.') master_application.shutdown() # outstation_application.shutdown() diff --git a/src/dnp3demo/data_retrieval_demo_outstation.py b/src/dnp3demo/data_retrieval_demo_outstation.py index 4aa2677..627a61f 100644 --- a/src/dnp3demo/data_retrieval_demo_outstation.py +++ b/src/dnp3demo/data_retrieval_demo_outstation.py @@ -7,6 +7,7 @@ from dnp3_python.dnp3station.outstation_new import MyOutStationNew from time import sleep +import time import datetime @@ -21,7 +22,7 @@ # _log.setLevel(logging.ERROR) -def main(): +def main(duration=300): # cmd_interface_master = MasterCmdNew() # master_application = MyMasterNew(log_handler=MyLogger(), # listener=AppChannelListener(), @@ -34,7 +35,12 @@ def main(): _log.debug('Initialization complete. OutStation in command loop.') count = 0 - while count < 20: + start = time.time() + end = time.time() + + count = 0 + while count < 1000 and (end - start) < duration: + end = time.time() sleep(5) # Note: hard-coded, master station query every 1 sec. count += 1 @@ -70,6 +76,7 @@ def main(): p_val = random.choice(pts) print(f"====== Outstation update index {i} with {p_val}") outstation_application.apply_update(opendnp3.Binary(True), i) + print(f"====== outstation database: {outstation_application.db_handler.db}") _log.debug('Exiting.') diff --git a/src/dnp3demo/run_master.py b/src/dnp3demo/run_master.py new file mode 100644 index 0000000..69618fb --- /dev/null +++ b/src/dnp3demo/run_master.py @@ -0,0 +1,173 @@ +import logging +import sys +import argparse +from dnp3_python.dnp3station.master_new import MyMasterNew +from time import sleep + + +stdout_stream = logging.StreamHandler(sys.stdout) +stdout_stream.setFormatter(logging.Formatter('%(asctime)s\t%(name)s\t%(levelname)s\t%(message)s')) + +_log = logging.getLogger(__name__) +_log = logging.getLogger("control_workflow_demo") +_log.addHandler(stdout_stream) +_log.setLevel(logging.DEBUG) + + +def input_prompt(display_str=None) -> str: + if display_str is None: + display_str = """ +======== Your Input Here: ======== +""" + return input(display_str) + + +def setup_args(parser: argparse.ArgumentParser) -> argparse.ArgumentParser: + + # Adding optional argument + parser.add_argument("-mip", "--master-ip", action="store", default="0.0.0.0", type=str, + metavar="") + parser.add_argument("-oip", "--outstation-ip", action="store", default="127.0.0.1", type=str, + metavar="") + parser.add_argument("-p", "--port", action="store", default=20000, type=int, + metavar="") + parser.add_argument("-mid", "--master-id", action="store", default=2, type=int, + metavar="") + parser.add_argument("-oid", "--outstation-id", action="store", default=1, type=int, + metavar="") + + return parser + + +def print_menu(): + welcome_str = """\ +========================= MENU ================================== + - set analog-input point value + - set analog-output point value + - set binary-input point value + - set binary-output point value +
- display database + - display configuration +=================================================================\ +""" + print(welcome_str) + +def main(parser=None, *args, **kwargs): + + if parser is None: + # Initialize parser + parser = argparse.ArgumentParser( + prog="dnp3-master", + description="Run a dnp3 master", + # epilog="Thanks for using %(prog)s! :)", + ) + parser = setup_args(parser) + + # Read arguments from command line + args = parser.parse_args() + + # dict to store args.Namespace + d_args = vars(args) + print(__name__, d_args) + + master_application = MyMasterNew( + masterstation_ip_str=args.master_ip, + outstation_ip_str=args.outstation_ip, + port=args.port, + masterstation_id_int=args.master_id, + outstation_id_int=args.outstation_id, + + # channel_log_level=opendnp3.levels.ALL_COMMS, + # master_log_level=opendnp3.levels.ALL_COMMS + # soe_handler=SOEHandler(soehandler_log_level=logging.DEBUG) + ) + _log.info("Communication Config", master_application.get_config()) + master_application.start() + _log.debug('Initialization complete. Master Station in command loop.') + + sleep(3) + # Note: if without sleep(2) there will be a glitch when first send_select_and_operate_command + # (i.e., all the values are zero, [(0, 0.0), (1, 0.0), (2, 0.0), (3, 0.0)])) + # since it would not update immediately + + count = 0 + while count < 1000: + # sleep(1) # Note: hard-coded, master station query every 1 sec. + + count += 1 + print(f"=========== Count {count}") + + if master_application.is_connected: + # print("Communication Config", master_application.get_config()) + print_menu() + else: + print("Communication error.") + print("Communication Config", master_application.get_config()) + print("Start retry...") + sleep(2) + continue + + option = input_prompt() # Note: one of ["a", "b", "dd", "dc"] + while True: + if option == "a": + print("You chose - set analog-output point value") + print("Type in and . Separate with space, then hit ENTER.") + print("Type 'q', 'quit', 'exit' to main menu.") + input_str = input_prompt() + if input_str in ["q", "quit", "exit"]: + break + try: + p_val = float(input_str.split(" ")[0]) + index = int(input_str.split(" ")[1]) + master_application.send_direct_point_command(group=40, variation=4, index=index, val_to_set=p_val) + result = master_application.get_db_by_group_variation(group=40, variation=4) + print(result) + sleep(2) + except Exception as e: + print(f"your input string '{input_str}'") + print(e) + elif option == "b": + print("You chose - set binary-output point value") + print("Type in <[1/0]> and . Separate with space, then hit ENTER.") + input_str = input_prompt() + if input_str in ["q", "quit", "exit"]: + break + try: + p_val_input = input_str.split(" ")[0] + if p_val_input not in ["0", "1"]: + raise ValueError("binary-output value only takes '0' or '1'.") + else: + p_val = True if p_val_input == "1" else False + index = int(input_str.split(" ")[1]) + master_application.send_direct_point_command(group=10, variation=2, index=index, val_to_set=p_val) + result = master_application.get_db_by_group_variation(group=10, variation=2) + print(result) + sleep(2) + except Exception as e: + print(f"your input string '{input_str}'") + print(e) + elif option == "dd": + print("You chose < dd > - display database") + master_application.send_scan_all_request() + sleep(1) + db_print = master_application.soe_handler.db + print(db_print) + sleep(2) + break + elif option == "dc": + print("You chose < dc > - display configuration") + print(master_application.get_config()) + sleep(3) + break + else: + print(f"ERROR- your input `{option}` is not one of the following.") + sleep(1) + break + + _log.debug('Exiting.') + master_application.shutdown() + # outstation_application.shutdown() + + +if __name__ == '__main__': + main() diff --git a/src/dnp3demo/run_outstation.py b/src/dnp3demo/run_outstation.py new file mode 100644 index 0000000..12b96a1 --- /dev/null +++ b/src/dnp3demo/run_outstation.py @@ -0,0 +1,210 @@ +import logging +import sys +import argparse + +from pydnp3 import opendnp3 +from dnp3_python.dnp3station.outstation_new import MyOutStationNew + +from time import sleep + +stdout_stream = logging.StreamHandler(sys.stdout) +stdout_stream.setFormatter(logging.Formatter('%(asctime)s\t%(name)s\t%(levelname)s\t%(message)s')) + +_log = logging.getLogger(__name__) +_log = logging.getLogger("control_workflow_demo") +_log.addHandler(stdout_stream) +_log.setLevel(logging.DEBUG) + + +def input_prompt(display_str=None) -> str: + if display_str is None: + display_str = """ +======== Your Input Here: ======== +""" + return input(display_str) + + +def setup_args(parser: argparse.ArgumentParser) -> argparse.ArgumentParser: + + # Adding optional argument + parser.add_argument("-mip", "--master-ip", action="store", default="0.0.0.0", type=str, + metavar="") + parser.add_argument("-oip", "--outstation-ip", action="store", default="127.0.0.1", type=str, + metavar="") + parser.add_argument("-p", "--port", action="store", default=20000, type=int, + metavar="") + parser.add_argument("-mid", "--master-id", action="store", default=2, type=int, + metavar="") + parser.add_argument("-oid", "--outstation-id", action="store", default=1, type=int, + metavar="") + + return parser + + +def print_menu(): + welcome_str = """\ +========================= MENU ================================== + - set analog-input point value + - set analog-output point value + - set binary-input point value + - set binary-output point value +
- display database + - display configuration +=================================================================\ +""" + print(welcome_str) + +def main(parser=None, *args, **kwargs): + + if parser is None: + # Initialize parser + parser = argparse.ArgumentParser( + prog="dnp3-outstation", + description="Run a dnp3 outstation", + # epilog="Thanks for using %(prog)s! :)", + ) + parser = setup_args(parser) + + # Read arguments from command line + args = parser.parse_args() + + # dict to store args.Namespace + d_args = vars(args) + print(__name__, d_args) + + outstation_application = MyOutStationNew( + masterstation_ip_str=args.master_ip, + outstation_ip_str=args.outstation_ip, + port=args.port, + masterstation_id_int=args.master_id, + outstation_id_int=args.outstation_id, + + # channel_log_level=opendnp3.levels.ALL_COMMS, + # master_log_level=opendnp3.levels.ALL_COMMS + # soe_handler=SOEHandler(soehandler_log_level=logging.DEBUG) + ) + _log.info("Communication Config", outstation_application.get_config()) + outstation_application.start() + _log.debug('Initialization complete. Outstation in command loop.') + + sleep(3) + # Note: if without sleep(2) there will be a glitch when first send_select_and_operate_command + # (i.e., all the values are zero, [(0, 0.0), (1, 0.0), (2, 0.0), (3, 0.0)])) + # since it would not update immediately + + count = 0 + while count < 1000: + # sleep(1) # Note: hard-coded, master station query every 1 sec. + + count += 1 + # print(f"=========== Count {count}") + + if outstation_application.is_connected: + # print("Communication Config", master_application.get_config()) + print_menu() + else: + print("Communication error.") + print("Communication Config", outstation_application.get_config()) + print("Start retry...") + sleep(2) + continue + + option = input_prompt() # Note: one of ["ai", "ao", "bi", "bo", "dd", "dc"] + while True: + if option == "ai": + print("You chose - set analog-input point value") + print("Type in and . Separate with space, then hit ENTER.") + print("Type 'q', 'quit', 'exit' to main menu.") + input_str = input_prompt() + if input_str in ["q", "quit", "exit"]: + break + try: + p_val = float(input_str.split(" ")[0]) + index = int(input_str.split(" ")[1]) + outstation_application.apply_update(opendnp3.Analog(value=p_val), index) + result = {"Analog": outstation_application.db_handler.db.get("Analog")} + print(result) + sleep(2) + except Exception as e: + print(f"your input string '{input_str}'") + print(e) + elif option == "ao": + print("You chose - set analog-output point value") + print("Type in and . Separate with space, then hit ENTER.") + print("Type 'q', 'quit', 'exit' to main menu.") + input_str = input_prompt() + if input_str in ["q", "quit", "exit"]: + break + try: + p_val = float(input_str.split(" ")[0]) + index = int(input_str.split(" ")[1]) + outstation_application.apply_update(opendnp3.AnalogOutputStatus(value=p_val), index) + result = {"AnalogOutputStatus": outstation_application.db_handler.db.get("AnalogOutputStatus")} + print(result) + sleep(2) + except Exception as e: + print(f"your input string '{input_str}'") + print(e) + elif option == "bi": + print("You chose - set binary-input point value") + print("Type in <[1/0]> and . Separate with space, then hit ENTER.") + input_str = input_prompt() + if input_str in ["q", "quit", "exit"]: + break + try: + p_val_input = input_str.split(" ")[0] + if p_val_input not in ["0", "1"]: + raise ValueError("binary-output value only takes '0' or '1'.") + else: + p_val = True if p_val_input == "1" else False + index = int(input_str.split(" ")[1]) + outstation_application.apply_update(opendnp3.Binary(value=p_val), index) + result = {"Binary": outstation_application.db_handler.db.get("Binary")} + print(result) + sleep(2) + except Exception as e: + print(f"your input string '{input_str}'") + print(e) + elif option == "bo": + print("You chose - set binary-output point value") + print("Type in <[1/0]> and . Separate with space, then hit ENTER.") + input_str = input_prompt() + if input_str in ["q", "quit", "exit"]: + break + try: + p_val_input = input_str.split(" ")[0] + if p_val_input not in ["0", "1"]: + raise ValueError("binary-output value only takes '0' or '1'.") + else: + p_val = True if p_val_input == "1" else False + index = int(input_str.split(" ")[1]) + outstation_application.apply_update(opendnp3.BinaryOutputStatus(value=p_val), index) + result = {"BinaryOutputStatus": outstation_application.db_handler.db.get("BinaryOutputStatus")} + print(result) + sleep(2) + except Exception as e: + print(f"your input string '{input_str}'") + print(e) + elif option == "dd": + print("You chose < dd > - display database") + db_print = outstation_application.db_handler.db + print(db_print) + sleep(2) + break + elif option == "dc": + print("You chose < dc> - display configuration") + print(outstation_application.get_config()) + sleep(3) + break + else: + print(f"ERROR- your input `{option}` is not one of the following.") + sleep(1) + break + + _log.debug('Exiting.') + outstation_application.shutdown() + # outstation_application.shutdown() + + +if __name__ == '__main__': + main()