diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml index 228126b..6c19f4a 100644 --- a/.pre-commit-config.yaml +++ b/.pre-commit-config.yaml @@ -23,7 +23,7 @@ repos: - id: pyupgrade args: ["--py3-plus"] - repo: https://github.com/pycqa/isort - rev: 5.11.4 + rev: 5.12.0 hooks: - id: isort name: isort (python) diff --git a/README.md b/README.md index e4669a8..518a366 100644 --- a/README.md +++ b/README.md @@ -54,9 +54,9 @@ python3 –m pip install . ### `WebRelayPreselector` Configuration The `WebRelayPreselector` requires a [SigMF metadata file](https://Github.com/NTIA/sigmf-ns-ntia) -that describes the sensor preselector and a config file to describe the x310 settings for -the RF paths specified in the metadata and for any other desired sources. Below is an -example config file for the `WebRelayPreselector` to describe how it works: +that describes the sensor preselector and a config file to describe the WebRelay +settings for the RF paths specified in the metadata and for any other desired sources. +Below is an example config file for the `WebRelayPreselector` to describe how it works: ```json { @@ -71,23 +71,50 @@ example config file for the `WebRelayPreselector` to describe how it works: "noise diode powered" : "relay2=1", "antenna path enabled": "relay1=0", "noise diode path enabled": "relay1=1" + }, + "sensors": { + "internal_temp": 1, + "internal_humidity": 2, + "tec_intake_temp": 3, + "tec_exhaust_temp": 4 + }, + "digital_inputs": { + "ups_power": 1, + "ups_battery_level": 2, + "ups_trouble": 3, + "ups_battery_replace": 4 + }, + "analog_inputs": { + "door_sensor": 1, + "5vdc_monitor": 2, + "28vdc_monitor": 3, + "15vdc_monitor": 4, + "24vdc_monitor": 5 } } ``` +Note, the config above is specifically for a prelector with a `ControlByWebWebRelay.` +Other Preselectors and WebRelays may require a different configuration. The `base_url` and `name` keys are the only required keys for the `WebRelayPreselector`. The `base_url` should map to the base URL to interact with the WebRelay (see [https://www.controlbyweb.com/x310](https://www.controlbyweb.com/x310) -for more info). The keys within the control_states key should correspond to RF paths -documented in the SigMF metadata. The keys within the status_states should map to the +for more info). The keys within the `control_states` key should correspond to RF paths +documented in the SigMF metadata. The keys within the `status_states` should map to the RF paths documented in the SigMF metadata, or to understandable states of the preselector for which it is desired to determine whether they are enabled or disabled. -The status method of the preselector will provide each of the keys specified in the -status_states entry mapped to a boolean indicating whether the preselector states match +The `get_status` method of the preselector will provide each of the keys specified in the +`status_states` entry mapped to a boolean indicating whether the preselector states match those specified in the mapping. Each of the entries in the config provide mappings to the -associated web relay input states and every RFPath defined in the sensor definition json -file should have an entry in the preselector config. The keys in the dictionary may use the -name of the RFPath or the index of the RFPath in the RFPaths array. +associated web relay input states and every `RFPath` defined in the sensor definition json +file should have an entry in the preselector config. +The `sensors`, `digital_inputs`, and `analog_inputs` keys define the sensors, +digital inputs and analog inputs configured on the device. Within each of the sections, +each key provides the name of the sensor or input and the value specifies the assigned +sensor or input number. The `get_satus` method will provide each sensor/input value with +the specified label. Every status_state, sensor, and input must have a unique name. +Attempting to create a`ControlByWebWebRelay` with duplicate status_states, +sensors, or inputs will cause a `ConfigurationException.` In this example, there are `noise_diode_on` and `noise_diode_off` keys to correspond to the preselector paths to turn the noise diode on and off, and an antenna key to indicate the diff --git a/config/config.json b/config/config.json index 85f7c8a..12215d5 100644 --- a/config/config.json +++ b/config/config.json @@ -9,6 +9,25 @@ "noise diode powered" : "relay2=1", "antenna path enabled": "relay1=0", "noise diode path enabled": "relay1=1" + }, + "sensors": { + "internal_temp": 1, + "internal_humidity": 2, + "tec_intake_temp": 3, + "tec_exhaust_temp": 4 + }, + "digital_inputs": { + "ups_power": 1, + "ups_battery_level": 2, + "ups_trouble": 3, + "ups_battery_replace": 4 + }, + "analog_inputs": { + "door_sensor": 1, + "5vdc_monitor": 2, + "28vdc_monitor": 3, + "15vdc_monitor": 4, + "24vdc_monitor": 5 } } diff --git a/src/its_preselector/__init__.py b/src/its_preselector/__init__.py index 131942e..f5f41e5 100644 --- a/src/its_preselector/__init__.py +++ b/src/its_preselector/__init__.py @@ -1 +1 @@ -__version__ = "3.0.2" +__version__ = "3.1.0" diff --git a/src/its_preselector/controlbyweb_web_relay.py b/src/its_preselector/controlbyweb_web_relay.py index d1a65b1..21ad154 100644 --- a/src/its_preselector/controlbyweb_web_relay.py +++ b/src/its_preselector/controlbyweb_web_relay.py @@ -35,6 +35,24 @@ def __init__(self, config: dict, timeout: int = 1, retries=3): elif config["name"] == "": raise ConfigurationException("name cannot be blank.") self.retries = retries + self.check_for_unique_names(config) + + def check_for_unique_names(self, config: dict): + names = {} + self.check_and_add_keys("status_states", config, names) + self.check_and_add_keys("sensors", config, names) + self.check_and_add_keys("analog_inputs", config, names) + self.check_and_add_keys("digital_inputs", config, names) + + def check_and_add_keys(self, key_type: str, config: dict, unique_names: dict): + if key_type in config: + for key, value in config[key_type].items(): + if key in unique_names: + raise ConfigurationException( + "All sensors and inputs must have unique names" + ) + else: + unique_names[key] = key def get_sensor_value(self, sensor_num: int) -> float: """ @@ -45,7 +63,7 @@ def get_sensor_value(self, sensor_num: int) -> float: :return: The desired sensor value. """ sensor_num_string = str(sensor_num) - response = self.request_with_retry(self.base_url) + response = self.get_state_xml() # Check for X310 xml format first. sensor_tag = "sensor" + sensor_num_string root = ET.fromstring(response.text) @@ -70,7 +88,7 @@ def get_digital_input_value(self, input_num: int) -> bool: :return: The boolean value of the desired digital input. """ input_num = int(input_num) - response = self.request_with_retry(self.base_url) + response = self.get_state_xml() # Check for X310 format first input_tag = f"input{input_num}state" root = ET.fromstring(response.text) @@ -83,6 +101,22 @@ def get_digital_input_value(self, input_num: int) -> bool: raise ConfigurationException(f"Digital Input {input_num} does not exist.") return bool(int(digital_input.text)) + def get_analog_input_value(self, input_num: int) -> float: + """ + Read float value from an analog input of the WebRelay. + + :param input_num: Configured index of the desired analog input. + :raises ConfigurationException: If the requested analog input cannot be read. + :return: The desired analog input value. + """ + response = self.get_state_xml() + input_tag = f"analogInput{input_num}" + root = ET.fromstring(response.text) + sensor = root.find(input_tag) + if sensor is None: + raise ConfigurationException(f"Analog input {input_tag} does not exist.") + return float(sensor.text) + def set_state(self, key): """ Set the state of the relay. @@ -143,6 +177,27 @@ def get_status(self): for relay_state in relay_states: matches = matches and self.state_matches(relay_state, xml_root) state[key] = matches + + if "sensors" in self.config: + for key, value in self.config["sensors"].items(): + try: + state[key] = self.get_sensor_value(value) + except: + logger.error( + f"Unable to get sensor value for sensor:{value}" + ) + if "digital_inputs" in self.config: + for key, value in self.config["digital_inputs"].items(): + try: + state[key] = self.get_digital_input_value(value) + except: + logger.error(f"Unable to read digital input:{value}") + if "analog_inputs" in self.config: + for key, value in self.config["analog_inputs"].items(): + try: + state[key] = self.get_analog_input_value(value) + except: + logger.error(f"Unable to read analog input:{value}") except: logger.error("Unable to get status") state["healthy"] = healthy diff --git a/src/its_preselector/web_relay.py b/src/its_preselector/web_relay.py index a5aac0d..4a53de2 100644 --- a/src/its_preselector/web_relay.py +++ b/src/its_preselector/web_relay.py @@ -11,24 +11,33 @@ def __init__(self, config: dict, timeout: int = 1): self.timeout = timeout @abstractmethod - def get_sensor_value(sensor) -> float: + def get_sensor_value(self, sensor) -> float: """ Read the value from a 1-Wire sensor on the web relay. - :param sensor: The name or ID of the sensor. + :param sensor: The ID of the sensor. :return: The float value read from the sensor, e.g. the temperature. """ pass @abstractmethod - def get_digital_input_value(input) -> bool: + def get_digital_input_value(self, input_id) -> bool: """ Read the value from a digital input on the web relay. - :param input: The name or ID of the digital input + :param input_id: The ID of the digital input :return: The boolean value read from the digital input. """ + @abstractmethod + def get_analog_input_value(self, input_id) -> float: + """ + Read the value from an analog input on the web relay. + + :param input_id: The ID of the analog input + :return: The float value read from the analog input. + """ + @abstractmethod def set_state(self, state_key: str) -> None: """ diff --git a/tests/test_controlbyweb_web_relay.py b/tests/test_controlbyweb_web_relay.py index d57696e..9292271 100644 --- a/tests/test_controlbyweb_web_relay.py +++ b/tests/test_controlbyweb_web_relay.py @@ -4,6 +4,7 @@ import defusedxml.ElementTree as ET from requests import Response, codes +from its_preselector.configuration_exception import ConfigurationException from its_preselector.controlbyweb_web_relay import ControlByWebWebRelay @@ -22,6 +23,7 @@ def setUpClass(cls): "0" "27.6" "0" + "1.4" "102.3" "9160590" "-25200" @@ -30,6 +32,27 @@ def setUpClass(cls): "" ) + def test_requires_unique_sensors_and_inputs(self): + with self.assertRaises(ConfigurationException): + relay = ControlByWebWebRelay( + { + "base_url": "127.0.0.1", + "name": "test_preselector", + "control_states": { + "noise_diode_off": "1State=1,2State=0,3State=0,4State=0" + }, + "status_states": { + "noise diode powered": "relay2=1", + "antenna path enabled": "relay1=0", + "noise diode path enabled": "relay1=1", + "noise on": "relay2=1,relay1=1", + "measurements": "relay1=0,relay2=0,relay3=0,relay4=0", + }, + "sensors": {"duplicate": 1}, + "analog_inputs": {"duplicate": 1}, + } + ) + def test_is_enabled(self): web_relay = ControlByWebWebRelay( {"base_url": "127.0.0.1", "name": "test_switch"} @@ -87,6 +110,83 @@ def test_get_state_from_config(self): self.assertTrue(states["noise diode path enabled"]) self.assertTrue(states["noise on"]) + def test_get_sensor_value(self): + root = ET.fromstring(self.state) + web_relay = ControlByWebWebRelay( + { + "base_url": "http://127.0.0.1", + "name": "test_preselector", + "control_states": { + "noise_diode_off": "1State=1,2State=0,3State=0,4State=0" + }, + "status_states": { + "noise diode powered": "relay2=1", + "antenna path enabled": "relay1=0", + "noise diode path enabled": "relay1=1", + "noise on": "relay2=1,relay1=1", + "measurements": "relay1=0,relay2=0,relay3=0,relay4=0", + }, + } + ) + response = Response() + response.status_code = codes.ok + type(response).text = PropertyMock(return_value=self.state) + web_relay.get_state_xml = MagicMock(return_value=response) + sensor_value = web_relay.get_sensor_value(1) + self.assertEqual(102.3, sensor_value) + + def test_get_digital_input(self): + root = ET.fromstring(self.state) + web_relay = ControlByWebWebRelay( + { + "base_url": "http://127.0.0.1", + "name": "test_preselector", + "control_states": { + "noise_diode_off": "1State=1,2State=0,3State=0,4State=0" + }, + "status_states": { + "noise diode powered": "relay2=1", + "antenna path enabled": "relay1=0", + "noise diode path enabled": "relay1=1", + "noise on": "relay2=1,relay1=1", + "measurements": "relay1=0,relay2=0,relay3=0,relay4=0", + }, + "analog_inputs": {"analogInputTest": 1}, + } + ) + response = Response() + response.status_code = codes.ok + type(response).text = PropertyMock(return_value=self.state) + web_relay.get_state_xml = MagicMock(return_value=response) + input_val = web_relay.get_digital_input_value(1) + self.assertEqual(False, input_val) + + def test_get_analog_input(self): + root = ET.fromstring(self.state) + web_relay = ControlByWebWebRelay( + { + "base_url": "http://127.0.0.1", + "name": "test_preselector", + "control_states": { + "noise_diode_off": "1State=1,2State=0,3State=0,4State=0" + }, + "status_states": { + "noise diode powered": "relay2=1", + "antenna path enabled": "relay1=0", + "noise diode path enabled": "relay1=1", + "noise on": "relay2=1,relay1=1", + "measurements": "relay1=0,relay2=0,relay3=0,relay4=0", + }, + "analog_inputs": {"analogInputTest": 1}, + } + ) + response = Response() + response.status_code = codes.ok + type(response).text = PropertyMock(return_value=self.state) + web_relay.get_state_xml = MagicMock(return_value=response) + analogInputVal = web_relay.get_analog_input_value(1) + self.assertEqual(1.4, analogInputVal) + def test_get_status(self): web_relay = ControlByWebWebRelay( {