diff --git a/dashboard.py b/dashboard.py index 69644316..d2156fba 100644 --- a/dashboard.py +++ b/dashboard.py @@ -11,19 +11,19 @@ frames_config = [ - ("Oil System", 0, 50, 150), - ("Visualization Gas Control", 0, 50, 150), - ("System Checks", 0, None, None), - ("Beam Extraction", 0, None, None), - ("Vacuum System", 1, 150, 300), - ("Deflection Monitor", 1, None, None), - ("Beam Pulse", 1, None, None), - ("Main Control", 1, 50, 300), - ("Setup Script", 2, None, 25), - ("Interlocks", 2, None, 25), - ("High Voltage Warning", 2, None, 25), - ("Environmental", 3, 150, 450), - ("Cathode Heating", 3, 960, 450), + ("Interlocks", 0, None, 2), # Moved to the top row + ("Oil System", 1, 50, 150), + ("Visualization Gas Control", 2, 50, 150), + ("System Checks", 1, None, None), + ("Beam Extraction", 1, None, None), + ("Vacuum System", 2, 150, 300), + ("Deflection Monitor", 2, None, None), + ("Beam Pulse", 2, None, None), + ("Main Control", 2, 50, 300), + ("Setup Script", 3, None, 25), + ("High Voltage Warning", 3, None, 25), + ("Environmental", 4, 150, 450), + ("Cathode Heating", 4, 960, 450), ] class EBEAMSystemDashboard: @@ -33,6 +33,10 @@ def __init__(self, root, com_ports): self.root.title("EBEAM Control System Dashboard") + # if save file exists call it and open it + if saveFileExists(): + self.load_saved_pane_state() + # if save file exists call it and open it if saveFileExists(): self.load_saved_pane_state() @@ -65,6 +69,7 @@ def setup_main_pane(self): def create_frames(self): """Create frames for different systems and controls within the dashboard.""" global frames_config + global frames_config for title, row, width, height in frames_config: if width and height and title: @@ -73,7 +78,8 @@ def create_frames(self): else: frame = tk.Frame(borderwidth=1, relief="solid") self.rows[row].add(frame, stretch='always') - self.add_title(frame, title) + if title != "Interlocks": + self.add_title(frame, title) self.frames[title] = frame if title == "Setup Script": SetupScripts(frame) @@ -145,14 +151,22 @@ def add_title(self, frame, title): label = tk.Label(frame, text=title, font=("Helvetica", 10, "bold")) label.pack(pady=0, fill=tk.X) + # saves data to file when button is pressed # saves data to file when button is pressed def save_current_pane_state(self): save_pane_states(frames_config, self.frames, self.main_pane) + save_pane_states(frames_config, self.frames, self.main_pane) + # gets data in save config file (as dict) and updates the global var of frames_config # gets data in save config file (as dict) and updates the global var of frames_config def load_saved_pane_state(self): savedData = load_pane_states() + for i in range(len(frames_config)): + if frames_config[i][0] in savedData: + frames_config[i] = (frames_config[i][0], frames_config[i][1], savedData[frames_config[i][0]][0],savedData[frames_config[i][0]][1]) + savedData = load_pane_states() + for i in range(len(frames_config)): if frames_config[i][0] in savedData: frames_config[i] = (frames_config[i][0], frames_config[i][1], savedData[frames_config[i][0]][0],savedData[frames_config[i][0]][1]) @@ -191,11 +205,13 @@ def create_subsystems(self): logger=self.logger ), 'Interlocks': subsystem.InterlocksSubsystem( - self.frames['Interlocks'], - logger=self.logger + self.frames['Interlocks'], + com_ports = self.com_ports['Interlocks'], + logger=self.logger, + frames = self.frames ), 'Oil System': subsystem.OilSubsystem( - self.frames['Oil System'], + self.frames['Oil System'], logger=self.logger ), 'Cathode Heating': subsystem.CathodeHeatingSubsystem( @@ -207,8 +223,8 @@ def create_subsystems(self): def create_messages_frame(self): """Create a frame for displaying messages and errors.""" - self.messages_frame = MessagesFrame(self.rows[3]) - self.rows[3].add(self.messages_frame.frame, stretch='always') + self.messages_frame = MessagesFrame(self.rows[4]) + self.rows[4].add(self.messages_frame.frame, stretch='always') self.logger = self.messages_frame.logger def create_com_port_frame(self, parent_frame): @@ -225,7 +241,7 @@ def create_com_port_frame(self, parent_frame): self.port_selections = {} self.port_dropdowns = {} - for subsystem in ['VTRXSubsystem', 'CathodeA PS', 'CathodeB PS', 'CathodeC PS', 'TempControllers']: + for subsystem in ['VTRXSubsystem', 'CathodeA PS', 'CathodeB PS', 'CathodeC PS', 'TempControllers', 'Interlocks']: frame = ttk.Frame(self.com_port_menu) frame.pack(fill=tk.X, padx=5, pady=2) ttk.Label(frame, text=f"{subsystem}:").pack(side=tk.LEFT) diff --git a/instrumentctl/G9DriverFlowChart.png b/instrumentctl/G9DriverFlowChart.png new file mode 100644 index 00000000..0997f9af Binary files /dev/null and b/instrumentctl/G9DriverFlowChart.png differ diff --git a/instrumentctl/README.md b/instrumentctl/README.md new file mode 100644 index 00000000..192dfae1 --- /dev/null +++ b/instrumentctl/README.md @@ -0,0 +1,298 @@ +# G9 Driver Documentation: + + +## Purpose and functionality: + + +The OMRON G9SP is the safety controller being used for the safety system to analyze the input from the machine along with controlling the High voltage(HVOLT) for the printer. +This driver is made to be able to communicate through a serial connection to the G9SP to pull and update the GUI on the current status of the interlocks, along will handle and present the user with any current errors that the G9SP is experiencing. + + +Libraries / Imports: + + +PySerial (serial) - allows us to be able to send and receive data through the serial port on the G9 and the driver + + +LogLevel from utils - allows the ability for the driver to send errors or important information to the log for the user to see + + +## Communications: + + +### Serial Port: + + +The OMRON G9SP uses a proprietary pin lay out for serial communications: + + +**OMRON Standard** + + + + +| Pin | Abbr. | Signal | Signal Direction | +|-----|---------|-----------------------|------------------| +| 1 | FG | Frame ground | --- | +| 2 | SD (TXD)| Send data | Outputs | +| 3 | RD (RXD)| Receive data | Inputs | +| 4 | RS (RTS)| Request to send | Outputs | +| 5 | CS (CTS)| Clear to send | Inputs | +| 6 | 5 V | Power | --- | +| 7 | DR (DSR)| Data set ready | Inputs | +| 8 | ER (DTR)| Data terminal ready | Outputs | +| 9 | SG (0 V)| Signal ground | --- | +| Connector hood | FG | Frame ground | --- | + + +Compared to the Standard + + +**RS232 Standard** + + +| Pin | Signal Name | Abbreviation | Direction | +|-----|-------------|--------------|-----------| +| 1 | Carrier Detect | CD | Input | +| 2 | Receive Data | RD (RXD) | Input | +| 3 | Transmit Data | TD (TXD) | Output | +| 4 | Data Terminal Ready | DTR | Output | +| 5 | Signal Ground | SG | --- | +| 6 | Data Set Ready | DSR | Input | +| 7 | Request to Send | RTS | Output | +| 8 | Clear to Send | CTS | Input | +| 9 | Ring Indicator | RI | Input | + + +So the needed cable requires these connections to translate the OMRON Standard -> R232 Standard + + + + +| OMRON Pin | RS-232 Pin | Description | +|-----------|------------|-------------------------------------------------------------------------| +| 1 | 1 | Both connections need to be shielded | +| 2 | 3 | Connects OMRON's Send Data (TXD) to RS-232's Send Data (TXD) | +| 3 | 2 | Connects OMRON's Receive Data (RXD) to RS-232's Receive Data (RXD) | +| 4 -> 5 | --- | Jumps OMRON's Request to Send (RTS) to Clear to Send (CTS) (required) | +| --- | 7 -> 8 | Jumps RS-232's Request to Send (RTS) to Clear to Send (CTS) (optional) | +| 9 | 9 | Signal Ground to Signal Ground | + + +**Pyserial Configuration** + + +For the Pyserial port to effectively communicate with the safety controller, the following arguments are required when defining the serial object: + + +| Argument | Value | +|-----------|---------------------------------------------------------------------------------------| +| Port | The port that the serial connection is using on the driver side (e.g., COM##) | +| Baudrate | 9600 or 115200 bps (depends on the 3rd DIP switch setting on the controller) | +| Parity | Needs to be set to "even" | +| Stop Bits | 1 bit | +| Byte Size | 8 bits | +| Timeout | 300 ms (the G9SP controller is expected to respond within 300 ms) | + + +With this configuration the commands write and read are used for communications: + + +```python +serial_connection.write(data: bytes) -> int + + +serial_connection.read(size: int) -> bytes + + +serial_connection.read_until(data: bytes) -> bytes +``` + + +### Packages: + + +#### From Driver to G9SP: + + + + +| Byte Offset | Value | Size | Description | +|-------------|----------------|-------|-----------------------------------| +| +0 | `0x40` | 1 byte | Fixed value | +| +1 | `0x00` | 1 byte | Fixed value | +| +2 | `0x00` | 1 byte | Fixed value | +| +3 | `0x0F` | 1 byte | Fixed value | +| +4 | `0x4B` | 1 byte | Fixed value | +| +5 | `0x03` | 1 byte | Fixed value | +| +6 | `0x4D` | 1 byte | Fixed value | +| +7 | `0x00` | 1 byte | Fixed value | +| +8 | `0x01` | 1 byte | Fixed value | +| +9 | Data | 6 bytes | Data payload | +| +15 | Checksum (H) | 1 byte | High byte of the checksum | +| +16 | Checksum (L) | 1 byte | Low byte of the checksum | +| +17 | `0x2A` | 1 byte | Fixed value | +| +18 | `0x0D` | 1 byte | Fixed value | + + +For our driver the implementation for the Data section of the safety controller was not needed. To be able to request the Optional Communication data, when programmed in the controller, this functionality will need to be added. + + +The checksum requires the summation of bytes from byte 0 to 14 (inclusive), storing the most significant values in Checksum (H) and the least Significant bytes in Checksum (L). Thus without the use of the data section the checksum should always be python ``` b"\x00\xEB" ```. + + +#### From G9SP to Driver: + + + + +| Byte Offset | Value | Size | Description | +|-------------|-----------------|--------|--------------------------------------| +| +0 | `0x40` | 1 byte | Fixed value | +| +1 | Response length (HL): `0x00` | 1 byte | High byte of response length | +| +2 | Response length (LH): `0x00` | 1 byte | Low byte of response length | +| +3 | Response length (LL) | 1 byte | Actual length of the response | +| +4 | End code (H) | 2 bytes | High byte of the end code | +| +5 | End code (L) | 2 bytes | Low byte of the end code | +| +6 | Service code | 1 byte | Indicates the type of response | +| +7 | Data | 188 bytes | Main data payload | +| +195 | Checksum (H) | 1 byte | High byte of checksum | +| +196 | Checksum (L) | 1 byte | Low byte of checksum | +| +197 | `0x2A` | 1 byte | Fixed value | +| +198 | `0x0D` | 1 byte | Fixed value | + + +--- + + +| Response Length (LL) | Description | Rest of Message | +|------------------------------|---------------------------|--------------------------------------------------------------------------------------------------| +| Normal response | `0xC3` | End Code: `0x0000`
Service Code: `0xCB`
Data Section: Refer to detailed Data Section below | +| Error response | `0x09` | End Code: `0x0000`
Service Code: `0x94`
Data Section: Contains 2 bytes of reserved data | +| Incorrect command format | `0x06` | End Code: `0x0000`
Service Code: Not included in response
Data Section: Not included in response | + + + + +Checksum same as above, summation of bytes 0 to 194 (inclusive). + + +### Parsing response Package: + + +**Data Section** + + +| Offset | Name | Size | Description | +|--------|---------------------------------------|---------|--------------------------------------------| +| +0 | Optional Communications Transmission Data | 4 bytes | Data related to optional communications. | +| +4 | Safety Input Terminal Data Flags | 6 bytes | Flags indicating safety input terminal data status. | +| +10 | Safety Output Terminal Data Flags | 4 bytes | Flags indicating safety output terminal data status. | +| +14 | Safety Input Terminal Status Flags | 6 bytes | Flags for current status of safety input terminals. | +| +20 | Safety Output Terminal Status Flags | 4 bytes | Flags for current status of safety output terminals. | +| +24 | Safety Input Terminal Error Causes | 24 bytes | Error causes for safety input terminals. | +| +48 | Safety Output Terminal Error Causes | 16 bytes | Error causes for safety output terminals. | +| +64 | Reserved | 2 bytes | Reserved for future use. | +| +66 | Unit Status | 2 bytes | Overall status of the G9SP unit. | +| +68 | Configuration ID | 2 bytes | ID for the current configuration. | +| +70 | Unit Conduction Time | 4 bytes | Time since the unit started conducting. | +| +74 | Reserved | 20 bytes| Reserved for future use. | +| +94 | Present Error Information | 12 bytes | Information on current errors. | +| +106 | Error Log Count | 1 byte | Number of entries in the error log. | +| +107 | Operation Log Count | 1 byte | Number of entries in the operation log. | +| +108 | Error Log | 40 bytes | Detailed log of recent errors. | +| +148 | Operation Log | 40 bytes | Detailed log of recent operations. | + + + + +### Inputs / Outputs: +The OMRON G9SP controller is able to be configured to have a total of 20 inputs and 16 outputs. This is important to consider when parsing the response data because not considering these constraints given the data would lead to parsing the reserve sections which would lead to undefined behavior. + + +### Data Flags +Each input or output receives one bit: + +| Value (Binary) | Meaning | +|----------------|---------------------------------------------| +| 0b0 | Terminal OFF (or error for inputs) | +| 0b1 | Terminal ON | + +### Status Flags +Each input or output receives one bit: + +| Value (Binary) | Meaning | +|----------------|----------------------------------| +| 0b0 | Error | +| 0b1 | Normal operation (no error) | + +Errors in this section are caused by disconnected lines, ground faults, or short circuits. The specific cause of the error is detailed in the **Error Causes** sections below. + +### Error Causes + +#### Inputs + +Each input is assigned a nibble (4 bits) to indicate the cause of an error. The table below lists the hexadecimal representation of the nibble and the corresponding error meaning. + +| Value (Hex) | Meaning | +|-------------|-------------------------------------| +| 0x0 | No error | +| 0x1 | Invalid configuration | +| 0x2 | External test signal failure | +| 0x3 | Internal circuit error | +| 0x4 | Discrepancy error | +| 0x5 | Failure of the associated dual-channel input | + +#### Outputs + +Each output is assigned a nibble (4 bits) to indicate the cause of an error. The table below lists the hexadecimal representation of the nibble and the corresponding error meaning. + +| Value (Hex) | Meaning | +|-------------|----------------------------------------| +| 0x0 | No error | +| 0x1 | Invalid configuration | +| 0x2 | Overcurrent detection | +| 0x3 | Short circuit detection | +| 0x4 | Stuck-at-high detection | +| 0x5 | Failure of the associated dual-channel output | +| 0x6 | Internal circuit error | +| 0x8 | Dual channel violation | + +### Unit Status + +The Unit Status section of the package has 2 bytes, of which contain 4 flags that indicate the overall status of the controller. The rest of the bits are reserved. + +| Bit Position | Description | Meaning | +|--------------|--------------------------------|------------------------------------------------------------------------------------------| +| 0 | Unit Normal Operation Flag | 0 : Error occurred or program stopped
1 : Normal status | +| 9 | Output Power Supply Error Flag | 0 : Output power supply voltage normal
1 : Output power supply voltage error or OFF | +| 10 | Safety I/O Terminal Error Flag | 0 : No error in Safety I/O terminals
1 : Error in Safety I/O terminals | +| 13 | Function Block Error Flag | 0 : No error in any function block
1 : Error in a function block | + + +### Code Flow Diagram for `g9_driver.py` + +![alt text](G9DriverFlowChart.png) + +https://tinyurl.com/mr2wudtv (Updated 11/5) + + + + + + + + + + + + + + + + + + + + + diff --git a/instrumentctl/g9_driver.py b/instrumentctl/g9_driver.py new file mode 100644 index 00000000..bad07db0 --- /dev/null +++ b/instrumentctl/g9_driver.py @@ -0,0 +1,378 @@ +# g9_driver.py +import serial +import threading +import queue +import time +from utils import LogLevel + +class G9Driver: + NUMIN = 13 + + # Constants for protocol + SNDHEADER = b'\x40\x00\x00\x0F\x4B\x03\x4D\x00\x01' + SNDDATA = b'\x00\x00\x00\x00' + SNDRES = b'\x00\x00' + RECHEADER = b'\x40\x00\x00' + FOOTER = b'\x2A\x0D' + ALWAYS_START_BYTE = b'\x40' + EXPECTED_RESPONSE_LENGTH = b'\xC3' + EXPECTED_DATA_LENGTH = 199 # bytes + + # Offsets for data extraction + OCTD_OFFSET = 7 # Optional Communications Transmission Data + US_OFFSET = 73 # Unit Status + SITDF_OFFSET = 11 # Safety Input Terminal Data Flags + SOTDF_OFFSET = 17 # Safety Output Terminal Data Flags + SITSF_OFFSET = 21 # Safety Input Terminal Status Flags + SOTSF_OFFSET = 27 # Safety Output Terminal Status Flags + SOTEC_OFFSET = 55 # Safety Output Terminal Error Causes + SITEC_OFFSET = 31 # Safety Input Terminal Error Causes + CHECKSUM_HIGH = 195 # G9 Response Checksum + CHECKSUM_LOW = 196 # G9 Response Checksum + + # Status dictionaries + IN_STATUS = { + 0: "No error", + 1: "Invalid configuration", + 2: 'External test signal failure', + 3: 'Internal circuit error', + 4: 'Discrepancy error', + 5: 'Failure of the associated dual-channel input' + } + + OUT_STATUS = { + 0: 'No error', + 1: 'Invalid configuration', + 2: 'Overcurrent detection', + 3: 'Short circuit detection', + 4: 'Stuck-at-high detection', + 5: 'Failure of the associated dual-channel output', + 6: 'Internal circuit error', + 8: 'Dual channel violation' + } + + US_STATUS = { + 9: "Output Power Supply Error Flag", + 10: "Safety I/O Terminal Error Flag", + 13: "Function Block Error Flag" + } + + def __init__(self, port=None, baudrate=9600, timeout=0.5, logger=None, debug_mode=False): + self.logger = logger + self.debug_mode = debug_mode + self._setup_serial(port, baudrate, timeout) + self.last_data = None + self.input_flags = [] + self._lock = threading.Lock() + self._response_queue = queue.Queue(maxsize=1) + self._running = True + self._thread = threading.Thread(target=self._communication_thread, daemon=True) + self._thread.start() + + def _setup_serial(self, port, baudrate, timeout): + """ + Attempts to make a serial connection + + Catch: + SerialException: If initizlization of serial port fails + """ + if port: + try: + self.ser = serial.Serial( + port=port, + baudrate=baudrate, + parity=serial.PARITY_EVEN, + stopbits=serial.STOPBITS_ONE, + bytesize=serial.EIGHTBITS, + timeout=timeout + ) + self.log(f"Serial connection established on {port}", LogLevel.INFO) + except serial.SerialException as e: + self.ser = None + self.log(f"Failed to open serial port {port}: {str(e)}", LogLevel.ERROR) + else: + self.ser = None + self.log("No port specified", LogLevel.WARNING) + + def _communication_thread(self): + """Background thread for handling serial communication""" + while self._running: + try: + with self._lock: + if not self.is_connected(): + time.sleep(0.1) + continue + + self._send_command() + response_data = self._read_response() # blocking until complete or timeout + if response_data: + + result = self._process_response(response_data) + + # clear queue if it has a old response + try: + self._response_queue.get_nowait() + except queue.Empty: + pass + + self._response_queue.put(result) + + except Exception as e: + self.log(f"Communication thread error: {str(e)}", LogLevel.ERROR) + #TODO: this might be solved with the comport detection, but if not might what to define this somewhere else + self._response_queue.queue[0] = ([0] * 13, [0] * 13, 0) + time.sleep(0.5) # back off on errors + + time.sleep(0.1) # minimum sleep between successful reads + + + def get_interlock_status(self): + """ + Non-blocking method to get the latest interlock status + Returns None if no data is available or on error + """ + try: + return self._response_queue.queue[0] + except queue.Empty: + self.log(f"No interlock information is here; Queue is Empty", LogLevel.WARNING) + return None + + def _send_command(self): + """ + Creates message for G9, sends it through serial connection + + Catch: + SerialException: If sending messages throws an error + + Raisw: + ConnectionError: Throws when sending message throws error + """ + message = self.SNDHEADER + self.SNDDATA + self.SNDRES + checksum = self._calculate_checksum(message, 14) + full_message = message + checksum + self.FOOTER + + try: + self.ser.write(full_message) + except serial.SerialException as e: + self.log(f"Error sending command: {str(e)}", LogLevel.ERROR) + raise ConnectionError(f"Failed to send command: {str(e)}") + + def _read_response(self): + """ + Read and validate response from G9SP device. + + Catch: + SerialException: If reading messages throws an error + + Raise: + ConnectionError: If serial port is not open + ValueError: For various validation failures + """ + try: + data = self.ser.read_until(self.FOOTER) + + if len(data) != self.EXPECTED_DATA_LENGTH: + length_error_msg = f"Invalid response length: got: {len(data)}, expected 199 bytes" + self.log(length_error_msg, LogLevel.ERROR) + raise ValueError(length_error_msg) + + self._validate_response_format(data) + self._validate_checksum(data) + + return data + + except serial.SerialException as e: + raise ConnectionError(f"Error reading response: {str(e)}") + + def _process_response(self, data): + """ + Process validated response and extract interlock data + + Return: + Bit representation of the I/O Data flags + """ + if data == None: + raise ValueError("Invalid inputs to _process_response: Data is None") + # Extract status data + status_data = { + 'unit_status': data[self.US_OFFSET:self.US_OFFSET + 2], + 'sitdf': data[self.SITDF_OFFSET:self.SITDF_OFFSET + 6], + 'sitsf': data[self.SITSF_OFFSET:self.SITSF_OFFSET + 6], + 'sotdf': data[self.SOTDF_OFFSET:self.SOTDF_OFFSET + 4], + 'sotsf': data[self.SOTSF_OFFSET:self.SOTSF_OFFSET + 4] + } + + # Convert to binary strings + binary_data = { + 'sitdf': self._extract_flags(status_data['sitdf'], self.NUMIN), + 'sitsf': self._extract_flags(status_data['sitsf'], self.NUMIN), + 'sotdf': self._extract_flags(status_data['sotdf'], 7), + 'sotsf': self._extract_flags(status_data['sotsf'], 7) + } + self.log(f"Safety Output Terminal Data Flags: {binary_data['sotdf']}", LogLevel.DEBUG) + + # Check for errors + self._check_unit_status(status_data['unit_status']) + self._check_safety_inputs(data) + self._check_safety_outputs(data) + + return binary_data['sitsf'], binary_data['sitdf'], binary_data['sotsf'][4] & binary_data['sotdf'][4] + + def _validate_response_format(self, data): + """ + Validate basic response format + + Raise: + ValueError: if formate is not as expected + """ + if data == None: + raise ValueError("Invalid inputs to _validate_response_format: Data is None") + if data[0:1] != self.ALWAYS_START_BYTE: + raise ValueError(f"Invalid start byte: {data[0:1].hex()}") + if data[1:3] != b'\x00\x00': + raise ValueError(f"Invalid response length bytes: {data[1:3].hex()}") + if data[3:4] != self.EXPECTED_RESPONSE_LENGTH: + raise ValueError(f"Incorrect response length indicator: {data[3:4].hex()}") + if data[-2:] != self.FOOTER: + raise ValueError(f"Invalid footer: {data[-2:].hex()}") + + def _calculate_checksum(self, data, bytes): + """ + Args: + data (bytes): The complete message bytes + start (int): Starting index for checksum calculation (default 0) + end (int): Ending index for checksum calculation (default 194) pg. 115 + + Return: + bytes: Two-byte checksum value + """ + if data == None: + raise ValueError("Invalid inputs to _calculate_checksum: Data is None") + checksum = sum(data[0:bytes + 1]) & 0xFFFF + return checksum.to_bytes(2, 'big') + + def _validate_checksum(self, data): + """ + Validate checksum of received data + + Raise: + ValueError: Calculated check sum does not match + """ + if data == None: + raise ValueError("Invalid inputs to _validate_checksum: Data is None") + + # Extract the received checksum (bytes 195-196) + received = data[self.CHECKSUM_HIGH:self.CHECKSUM_LOW + 1] # 1349 + + # Calculate expected checksum (bytes 0-194) + expected = self._calculate_checksum(data, 194) #1255 + if received != expected: + raise ValueError( + f"G9 Checksum failed. " + f"Expectation: expected {expected.hex()}, " + f" Received: {received.hex()}" + ) + + def _check_unit_status(self, status): + """ + Check unit status and raise error if issues found + + Raise: + ValueError: When Error Flag is found in unit status + """ + if status == None: + raise ValueError("Invalid inputs to _check_unit_status: status is None") + if status != b'\x01\x00': + bits = self._extract_flags(status, 16) + self.log(f"Unit status bits: {bits}", LogLevel.VERBOSE) + for k in self.US_STATUS.keys(): + if bits[k] == 1: + self.log(f"Unit State Error: {self.US_STATUS[k]}", LogLevel.CRITICAL) + if bits[0] == 0: + self.log("Unit State Error: Normal Operation Error Flag", LogLevel.CRITICAL) + + def _check_safety_inputs(self, data): + """Check safety input status""" + if data == None: + raise ValueError("Invalid inputs to _check_safety_inputs: Data is None") + self._check_terminal_status( + data[self.SITEC_OFFSET:self.SITEC_OFFSET + 24][-10:], + self.IN_STATUS, + "Input" + ) + + def _check_safety_outputs(self, data): + """Check safety output status""" + if data == None: + raise ValueError("Invalid inputs to _check_safety_outputs: Data is None") + self._check_terminal_status( + data[self.SOTEC_OFFSET:self.SOTEC_OFFSET + 16][-10:], + self.OUT_STATUS, + "Output" + ) + + def _check_terminal_status(self, data, status_dict, terminal_type): + """ + Generic terminal status checker + + Raise: + ValueError: If an error is found in the Error Cause Data or with invalid inputs + """ + if data == None or status_dict == None or terminal_type == None or terminal_type == "": + raise ValueError(f"_check_terminal_status is being called with invalid inputs {data} {status_dict} {terminal_type}") + + for i, byte in enumerate(reversed(data[:self.NUMIN])): + msb = byte >> 4 + lsb = byte & 0x0F + + for nibble, position in [(msb, 'H'), (lsb, 'L')]: + if nibble in status_dict and nibble != 0: + raise ValueError( + f"{terminal_type} error at byte {i}{position}: " + f"{status_dict[nibble]} (code {nibble})" + ) + + + # helper function to convert bytes to bits for checking flags + # not currently being used but many be helpful in the future for getting errors + def _bytes_to_binary(self, byte_string): + return ''.join(format(byte, '08b') for byte in byte_string) + + # this just makes sure that the ser object is considered to be valid + def is_connected(self): + return self.ser is not None and self.ser.is_open + + def _extract_flags(self, byte_string, num_bits): + """ + Extracts num_bits from the data + the bytes are order in big-endian meaning the first 8 are on top + but the bits in the bye are ordered in little-endian 7 MSB and 0 LSB + + Raise: + ValueError: When called requesting more bits than in the bytes + Return: + num_bits array - MSB is 0 signal LSB if (num_bits - 1)th bit (aka little endian) + """ + num_bytes = (num_bits + 7) // 8 + + if len(byte_string) < num_bytes: + raise ValueError(f"Input must contain at least {num_bytes} bytes; recieved {len(byte_string)}") + + extracted_bits = [] + for byte_index in range(num_bytes): + byte = byte_string[byte_index] + bits_to_extract = min(8, num_bits - (byte_index * 8)) + extracted_bits.extend(((byte >> i) & 1) for i in range(bits_to_extract - 1, -1, -1)[::-1]) + + return extracted_bits[:num_bits] + + def log(self, message, level=LogLevel.INFO): + """Log a message with the specified level if a logger is configured.""" + if self.logger: + self.logger.log(message, level) + elif self.debug_mode: + print(f"{level.name}: {message}") + + + #TODO: Figure out how to handle all the errors (end task) + #TODO: add a function to keep track of the driver uptime\ \ No newline at end of file diff --git a/main.py b/main.py index bdce888c..927762f1 100644 --- a/main.py +++ b/main.py @@ -22,7 +22,7 @@ def config_com_ports(saved_com_ports): except ImportError: pass - subsystems = ['VTRXSubsystem', 'CathodeA PS', 'CathodeB PS', 'CathodeC PS', 'TempControllers'] + subsystems = ['VTRXSubsystem', 'CathodeA PS', 'CathodeB PS', 'CathodeC PS', 'TempControllers', 'Interlocks'] available_ports = [port.device for port in serial.tools.list_ports.comports()] if not available_ports: @@ -46,6 +46,7 @@ def config_com_ports(saved_com_ports): main_frame.pack(side=tk.TOP, fill=tk.X) # Create a dropdown for each subsystem + subsystems = ['VTRXSubsystem', 'CathodeA PS', 'CathodeB PS', 'CathodeC PS', 'TempControllers', 'Interlocks'] for subsystem in subsystems: frame = ttk.Frame(main_frame) frame.pack(pady=5, anchor='center') diff --git a/media/redOff.png b/media/redOff.png new file mode 100644 index 00000000..b8651866 Binary files /dev/null and b/media/redOff.png differ diff --git a/scripts/G9Drivertest/__init__.py b/scripts/G9Drivertest/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/scripts/G9Drivertest/errorTests.py b/scripts/G9Drivertest/errorTests.py new file mode 100644 index 00000000..82fd63ec --- /dev/null +++ b/scripts/G9Drivertest/errorTests.py @@ -0,0 +1,200 @@ +import sys +import os +sys.path.append(os.path.dirname(os.path.dirname(os.path.dirname(__file__)))) + +import unittest +from unittest.mock import MagicMock, patch +from instrumentctl.g9_driver import G9Driver + +class TestG9Driver(unittest.TestCase): + # Sample response data - modified versions for different test cases + BASE_RESPONSE = bytearray([ + 0x40, 0x00, 0x00, 0xC3, # Header + # Bytes 4-10: Initial padding + 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, + # SITDF (Safety Input Terminal Data Flags) - 6 bytes + 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, + # SOTDF (Safety Output Terminal Data Flags) - 4 bytes + 0xFF, 0xFF, 0xFF, 0xFF, + # SITSF (Safety Input Terminal Status Flags) - 6 bytes + 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, + # SOTSF (Safety Output Terminal Status Flags) - 4 bytes + 0xFF, 0xFF, 0xFF, 0xFF + ] + [0x00] * 145 + [0x2A, 0x0D]) # Padding + Footer + + def setUp(self): + self.driver = G9Driver() + self.driver.ser = MagicMock() + # Set normal operation status in BASE_RESPONSE + self.BASE_RESPONSE[self.driver.US_OFFSET:self.driver.US_OFFSET + 2] = b'\x00\x01' + + def create_response_with_checksum(self, base_message): + """Helper to create a response with valid checksum""" + message_without_checksum = base_message[:-4] + checksum = self.driver._calculate_checksum(message_without_checksum, 194) + return message_without_checksum + checksum + self.driver.FOOTER + + def test_normal_response_processing(self): + """Test processing of a normal response with all systems operational""" + msg = bytearray(self.BASE_RESPONSE) + msg[self.driver.US_OFFSET:self.driver.US_OFFSET + 2] = b'\x00\x01' # Normal unit status + msg_with_checksum = self.create_response_with_checksum(bytes(msg)) + + self.driver.ser.read_until.return_value = msg_with_checksum + + try: + sitsf, sitdf = self.driver._process_response(msg_with_checksum) + self.assertEqual(len(sitsf), self.driver.NUMIN) + self.assertEqual(len(sitdf), self.driver.NUMIN) + self.assertTrue(all(bit == 1 for bit in sitsf)) + self.assertTrue(all(bit == 1 for bit in sitdf)) + except ValueError as e: + self.fail(f"process_response() raised ValueError unexpectedly: {str(e)}") + + def test_unit_status_error(self): + """Test detection of unit status errors""" + pass + # # Ensure msg is a bytearray to allow modification + # msg = bytearray(self.BASE_RESPONSE) + # # Set Output Power Supply Error Flag (bit 9) + # msg[self.driver.US_OFFSET:self.driver.US_OFFSET + 2] = b'\xFF\xFF' + + # # Convert to bytes only when needed for checksum creation + # msg_with_checksum = self.create_response_with_checksum(bytes(msg)) + + # with self.assertRaises(ValueError) as context: + # self.driver._process_response(msg_with_checksum) + # self.assertIn("Output Power Supply Error Flag", str(context.exception)) + + + + def test_safety_input_error(self): + """Test detection of safety input terminal errors""" + msg = bytearray(self.BASE_RESPONSE) + # Fill input error section with zeros first + msg[self.driver.SITEC_OFFSET:self.driver.SITEC_OFFSET + 24] = bytes([0] * 24) + # Set error in the last 10 bytes of the input section + error_section = msg[self.driver.SITEC_OFFSET:self.driver.SITEC_OFFSET + 24] + # Put error code 3 (Internal circuit error) at start of last 10 bytes + msg[self.driver.SITEC_OFFSET + 14] = 0x30 # Position error at start of last 10 bytes + msg_with_checksum = self.create_response_with_checksum(bytes(msg)) + + with self.assertRaises(ValueError) as context: + self.driver._process_response(msg_with_checksum) + self.assertIn("Internal circuit error", str(context.exception)) + + def test_safety_output_error(self): + """Test detection of safety output terminal errors""" + msg = bytearray(self.BASE_RESPONSE) + # Fill output error section with zeros first + msg[self.driver.SOTEC_OFFSET:self.driver.SOTEC_OFFSET + 16] = bytes([0] * 16) + # Set error code 2 (Overcurrent detection) at start of last 10 bytes + msg[self.driver.SOTEC_OFFSET + 6] = 0x20 + msg_with_checksum = self.create_response_with_checksum(bytes(msg)) + + with self.assertRaises(ValueError) as context: + self.driver._process_response(msg_with_checksum) + self.assertIn("Overcurrent detection", str(context.exception)) + + def test_response_format_validation(self): + """Test validation of response format""" + # Test invalid start byte + msg = bytearray(self.BASE_RESPONSE) + msg[0] = 0x41 # Wrong start byte + msg_with_checksum = self.create_response_with_checksum(bytes(msg)) + + with self.assertRaises(ValueError) as context: + self.driver._validate_response_format(msg_with_checksum) + self.assertIn("Invalid start byte", str(context.exception)) + + def test_checksum_validation(self): + """Test checksum validation""" + msg = bytes(self.BASE_RESPONSE) + # Corrupt the message after calculating valid checksum + msg_with_checksum = self.create_response_with_checksum(msg) + corrupted_msg = bytearray(msg_with_checksum) + corrupted_msg[10] = 0xFF # Change a byte in the message + + with self.assertRaises(ValueError) as context: + self.driver._validate_checksum(corrupted_msg) + self.assertIn("Checksum failed", str(context.exception)) + + #TODO: change this test since the unit status now does not raise an error + # def test_complex_error_combinations(self): + # """Test multiple simultaneous error conditions""" + # msg = bytearray(self.BASE_RESPONSE) + # # Set multiple errors + # # msg[self.driver.US_OFFSET:self.driver.US_OFFSET + 2] = b'\x02\x00' # Unit status error + # msg[self.driver.SITEC_OFFSET] = 0x30 # Input error + # msg[self.driver.SOTEC_OFFSET] = 0x20 # Output error + # msg_with_checksum = self.create_response_with_checksum(bytes(msg)) + + # with self.assertRaises(ValueError) as context: + # self.driver._process_response(msg_with_checksum) + # # Should raise the first error it encounters + # self.assertIn("Output Power Supply Error Flag", str(context.exception)) + + def test_input_terminal_status_checker(self): + """Test the input terminal status checker with various error codes""" + test_cases = [ + (0x10, "Invalid configuration"), + (0x20, "External test signal failure"), + (0x30, "Internal circuit error"), + (0x40, "Discrepancy error"), + (0x50, "Failure of the associated dual-channel input") + ] + + for error_code, expected_message in test_cases: + msg = bytearray(self.BASE_RESPONSE) + # Fill with zeros first + msg[self.driver.SITEC_OFFSET:self.driver.SITEC_OFFSET + 24] = bytes([0] * 24) + # Place error code in last byte + msg[self.driver.SITEC_OFFSET + 23] = error_code + msg_with_checksum = self.create_response_with_checksum(bytes(msg)) + + with self.assertRaises(ValueError) as context: + self.driver._process_response(msg_with_checksum) + self.assertIn(expected_message, str(context.exception)) + + def test_output_terminal_status_checker(self): + """Test the output terminal status checker with various error codes""" + test_cases = [ + (0x10, "Invalid configuration"), + (0x20, "Overcurrent detection"), + (0x30, "Short circuit detection"), + (0x40, "Stuck-at-high detection"), + (0x50, "Failure of the associated dual-channel output"), + (0x60, "Internal circuit error"), + (0x80, "Dual channel violation") + ] + + for error_code, expected_message in test_cases: + msg = bytearray(self.BASE_RESPONSE) + # Fill with zeros first + msg[self.driver.SOTEC_OFFSET:self.driver.SOTEC_OFFSET + 16] = bytes([0] * 16) + # Place error code in last byte + msg[self.driver.SOTEC_OFFSET + 15] = error_code + msg_with_checksum = self.create_response_with_checksum(bytes(msg)) + msg_with_checksum = self.create_response_with_checksum(bytes(msg)) + + with self.assertRaises(ValueError) as context: + self.driver._process_response(msg_with_checksum) + self.assertIn(expected_message, str(context.exception)) + + def test_calculate_checksum(self): + """Test the checksum calculation for a known data message.""" + # Create a sample message with known bytes + test_data = b'@\x00\x00\xc3\x00\x00\xcb\x00\x00\x00\x00\xfc\x0f\x00\x00\x00\x00E\x00\x00\x00\xff\xff\x0f\x00\x00\x00\xff\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x08\x00\x12\x00\x9a\x08~\x15\x00\x0020000012X17M\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00@\x00\x00\x00\x00\n\n?\x00\x15~?\x00\x15l?\x00\x15f?\x00\x15`?\x00\x15`?\x00\x15`?\x00\x15H?\x00\x15H?\x00\x15H?\x00\x15B\x06\x00\x15f\x01\x00\x15f\x06\x00\x15`\x01\x00\x15`\x06\x00\x15B\x01\x00\x15B\x06\x00\x15B\x01\x00\x15B\x06\x00\x15\x1e\x01\x00\x15\x1e\x14\xf6*\r' + + # Verify that the calculated checksum is correct + cal = self.driver._calculate_checksum(test_data, 194) + self.assertEqual(test_data[-4:-2], cal, + f""" + Checksum calculation did not match expected value: calculated {self.driver._calculate_checksum(test_data, 194)}; + expected {test_data[-4:-2]} + """) + + + +if __name__ == '__main__': + unittest.main() diff --git a/scripts/G9Drivertest/g9_driver_test_qa.ipynb b/scripts/G9Drivertest/g9_driver_test_qa.ipynb new file mode 100644 index 00000000..1ef29217 --- /dev/null +++ b/scripts/G9Drivertest/g9_driver_test_qa.ipynb @@ -0,0 +1,202 @@ +{ + "cells": [ + { + "cell_type": "code", + "execution_count": 16, + "metadata": {}, + "outputs": [], + "source": [ + "def extract_safety_input_flags(byte_string):\n", + " # Ensure input is at least two bytes for Si 00 to Si 13\n", + " if len(byte_string) < 2:\n", + " raise ValueError(\"Input must contain at least 2 bytes\")\n", + "\n", + " # Extract Si 00 to Si 07 from byte 0, reverse order\n", + " byte_0 = byte_string[0]\n", + " si_00_to_si_07 = [((byte_0 >> i) & 1) for i in range(7, -1, -1)]\n", + "\n", + " # Extract Si 08 to Si 13 from byte 1, reverse order for the first 6 bits\n", + " byte_1 = byte_string[1]\n", + " si_08_to_si_13 = [((byte_1 >> i) & 1) for i in range(7, -1, -1)]\n", + "\n", + " # Combine results for Si 00 to Si 13\n", + " si_00_to_si_13 = si_00_to_si_07[::-1] + si_08_to_si_13[::-1]\n", + " print(len(si_00_to_si_07), si_00_to_si_07)\n", + " print(len(si_08_to_si_13), si_08_to_si_13)\n", + " return si_00_to_si_13\n" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "def extract_safety_input_flags(byte_string, num_bits):\n", + " num_bytes = (num_bits + 7)\n", + " \n", + " if len(byte_string) < num_bytes:\n", + " raise ValueError(f\"Input must contain at least {num_bytes} bytes\")\n", + "\n", + " extracted_bits = []\n", + " for byte_index in range(num_bytes):\n", + " byte = byte_string[byte_index]\n", + " bits_to_extract = min(8, num_bits - (byte_index * 8))\n", + " extracted_bits.extend(((byte >> i) & 1) for i in range(bits_to_extract - 1, -1, -1)[::-1])\n", + "\n", + " return extracted_bits[:num_bits]" + ] + }, + { + "cell_type": "code", + "execution_count": 21, + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "8 [1, 1, 1, 1, 0, 1, 0, 1]\n", + "8 [0, 0, 0, 1, 0, 1, 1, 0]\n", + "16 [1, 0, 1, 0, 1, 1, 1, 1, 0, 1, 1, 0, 1]\n" + ] + } + ], + "source": [ + "# in: 1111010100010110 \\xf5\\x16\n", + "# out: 1010111101101\n", + "i = b'\\xf5\\x16'\n", + "o = extract_safety_input_flags(i)\n", + "print(len(o), o[:13])" + ] + }, + { + "cell_type": "code", + "execution_count": 19, + "metadata": {}, + "outputs": [ + { + "data": { + "text/plain": [ + "False" + ] + }, + "execution_count": 19, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [ + "o[:] == list(\"1010111101100\")" + ] + }, + { + "cell_type": "code", + "execution_count": 25, + "metadata": {}, + "outputs": [ + { + "data": { + "text/plain": [ + "[1, 0, 1, 0, 1, 1, 1, 1, 0, 1, 1, 0, 1]" + ] + }, + "execution_count": 25, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [ + "extract_safety_input_flags(i, 13)" + ] + }, + { + "cell_type": "code", + "execution_count": 31, + "metadata": {}, + "outputs": [ + { + "data": { + "text/plain": [ + "199" + ] + }, + "execution_count": 31, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [ + "msg = b'@\\x00\\x00\\xc3\\x00\\x00\\xcb\\x00\\x00\\x00\\x00\\xfc\\x0f\\x00\\x00\\x00\\x00E\\x00\\x00\\x00\\xff\\xff\\x0f\\x00\\x00\\x00\\xff\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x08\\x00\\x12\\x00\\x9a\\x08~\\x15\\x00\\x0020000012X17M\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00@\\x00\\x00\\x00\\x00\\n\\n?\\x00\\x15~?\\x00\\x15l?\\x00\\x15f?\\x00\\x15`?\\x00\\x15`?\\x00\\x15`?\\x00\\x15H?\\x00\\x15H?\\x00\\x15H?\\x00\\x15B\\x06\\x00\\x15f\\x01\\x00\\x15f\\x06\\x00\\x15`\\x01\\x00\\x15`\\x06\\x00\\x15B\\x01\\x00\\x15B\\x06\\x00\\x15B\\x01\\x00\\x15B\\x06\\x00\\x15\\x1e\\x01\\x00\\x15\\x1e\\x14\\xf6*\\r'\n", + "len(msg)" + ] + }, + { + "cell_type": "code", + "execution_count": 35, + "metadata": {}, + "outputs": [ + { + "data": { + "text/plain": [ + "b'\\x14\\xf6'" + ] + }, + "execution_count": 35, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [ + "check = msg[-4:-2]\n", + "check" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [ + { + "data": { + "text/plain": [ + "b'\\x1e\\x14\\xf6*\\r'" + ] + }, + "execution_count": 28, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [] + } + ], + "metadata": { + "kernelspec": { + "display_name": "venv", + "language": "python", + "name": "python3" + }, + "language_info": { + "codemirror_mode": { + "name": "ipython", + "version": 3 + }, + "file_extension": ".py", + "mimetype": "text/x-python", + "name": "python", + "nbconvert_exporter": "python", + "pygments_lexer": "ipython3", + "version": "3.11.9" + } + }, + "nbformat": 4, + "nbformat_minor": 2 +} diff --git a/scripts/G9Drivertest/responcseTest.py b/scripts/G9Drivertest/responcseTest.py new file mode 100644 index 00000000..65fcab7d --- /dev/null +++ b/scripts/G9Drivertest/responcseTest.py @@ -0,0 +1,32 @@ +# data = b'@\x00\x00\xc3\x00\x00\xcb\x00\x00\x00\x00\xff\x0f\x00\x00\x00\x00$\x00\x00\x00\xff\xff\x0f\x00\x00\x00\xff\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x08\x00\x13\x00\x9a\x08B\x15\x00\x0020000012X17M\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\n\n?\x00\x15= (13 * norm):\n", + " # all flags we care about are 1\n", + " return True\n", + " else:\n", + " return False\n", + "\n", + "\n", + "\n", + " #TODO: how do we want to handle the data \n", + " def response(self):\n", + " if not self.is_connected():\n", + " raise ConnectionError(\"Seiral Port is Not Open.\")\n", + " \n", + " data = self.ser.read(size=199)\n", + " self.lastResponse = data\n", + " if len(data) == 199:\n", + " alwaysHeader = data[0:3]\n", + " alwaysFooter = data[-2:]\n", + " print(alwaysFooter, alwaysHeader)\n", + " if alwaysHeader != b'\\x40\\x00\\x00' or alwaysFooter != b'\\x2A\\x0D':\n", + " raise ValueError(\"Always bits are incorrect\")\n", + " OCTD = data[7:11]\n", + " print(\"OCTD: \", OCTD)\n", + " if OCTD != self.msgOptData:\n", + " raise ValueError(\"Optional Transmission data doesn't match data sent to the G9SP\")\n", + "\n", + "\n", + " SITDF = data[11:17]\n", + " if not self.check_flags13(SITDF):\n", + " #TODO: add a function to check which input\n", + " raise ValueError(\"An input is either off or throwing an error\")\n", + "\n", + "\n", + " SOTDF = data[17:21]\n", + " if not self.check_flags13(SOTDF):\n", + " #TODO: add a function to check which output\n", + " raise ValueError(\"An output is either off or throwing an error\")\n", + "\n", + "\n", + " SITSF = data[21:27]\n", + " if not self.check_flags13(SITSF):\n", + " if self.safety_in_terminal_error(data[31:55]):\n", + " raise ValueError(\"Error was detected in inputs but was not found\")\n", + " \n", + " SOTSF = data[27:31]\n", + " if not self.check_flags13(SOTSF):\n", + " if self.safety_out_terminal_error(data[55:71]):\n", + " raise ValueError(\"Error was detected in outputs but was not found\")\n", + " \n", + "\n", + " # US - Unit Status\n", + " US = data[73:75]\n", + " print(\"US: \", US)\n", + " if US != 0:\n", + " if self.unit_state_error(US):\n", + " raise ValueError(\"Error was detected in Unit State but was not identified. Could be more than one\")\n", + " \n", + " \n", + " # # TODO: Need to add error log\n", + " # errorLog = data[108:149]\n", + "\n", + " # # TODO: Need to add operation log\n", + " # operationLog = data[148:199]\n", + " \n", + " else:\n", + " self.sendCommand()\n", + "\n", + " pass\n", + "\n", + "\n", + " \"\"\"\n", + " 0: No error\n", + " 1: Invalid configuration\n", + " 2: External test signal failure\n", + " 3: Internal circuit error\n", + " 4: Discrepancy error\n", + " 5: Failure of the associated dual-channel input\n", + " \"\"\"\n", + "\n", + " # checks all the SITSFs, throws error is one is found\n", + " def safety_in_terminal_error(self, data):\n", + " if len(data) != 24:\n", + " raise ValueError(f\"Expected 24 bytes, but received {len(data)}.\")\n", + "\n", + " last_bytes = data[-13:]\n", + " last_bytes = last_bytes[::-1]\n", + "\n", + " for i, byte in enumerate(last_bytes):\n", + " msb = byte >> 4 # most sig bits\n", + " lsb = byte & 0x0F # least sig bits\n", + "\n", + " # check high bits for errors\n", + " if msb in inStatus and msb != 0:\n", + " raise ValueError(f\"Error at byte {i}H, MSB: {inStatus[msb]} (code {msb})\")\n", + " # check low bits for errors\n", + " if lsb in inStatus and lsb != 0:\n", + " raise ValueError(f\"Error at byte {i}L, LSB: {inStatus[lsb]} (code {lsb})\")\n", + " return True\n", + " \n", + "\n", + "\n", + " \"\"\"\n", + " 0: No error\n", + " 1: Invalid configuration\n", + " 2: Overcurrent detection\n", + " 3: Short circuit detection\n", + " 4: Stuck-at-high detection\n", + " 5: Failure of the associated dual-channel output\n", + " 6: Internal circuit error\n", + " 8: Dual channel violation\n", + " \"\"\"\n", + "\n", + " # checks all the SOTSFs, throws error is one is found \n", + " def safety_out_terminal_error(self, data, inputs = 13):\n", + " if len(data) != 16:\n", + " raise ValueError(f\"Expected 16 bytes, but received {len(data)}.\")\n", + "\n", + " # only keep needs bytes\n", + " last_bytes = data[-inputs:]\n", + " # flip direction so enumerate can if us the byte number in the error\n", + " last_bytes = last_bytes[::-1]\n", + "\n", + " for i, byte in enumerate(last_bytes):\n", + " msb = byte >> 4 # most sig bits\n", + " lsb = byte & 0x0F # least sig bits\n", + "\n", + " # check high bits for errors\n", + " if msb in outStatus and msb != 0:\n", + " raise ValueError(f\"Error at byte {i}H, MSB: {outStatus[msb]} (code {msb})\")\n", + " # check low bits for errors\n", + " if lsb in outStatus and lsb != 0:\n", + " raise ValueError(f\"Error at byte {i}L, LSB: {outStatus[lsb]} (code {lsb})\")\n", + " return True\n", + " \n", + " \"\"\"\n", + " bit position: error\n", + " 0: Normal Operation Error Flag\n", + " 9: Output Power Supply Error Flag\n", + " 10: Safety I/O Terminal Error Flag\n", + " 13: Function Block Error Flag\n", + " \"\"\"\n", + " \n", + " # rn am hoping that only one of the error flags can be set at a time\n", + " def unit_state_error(self, data):\n", + " if len(data) != 2:\n", + " raise ValueError(f\"Expected 2 bytes, but received {len(data)}.\")\n", + " \n", + " bits = self.bytes_to_binary(data)\n", + "\n", + " if bits[-1] == \"0\":\n", + " raise ValueError(f\"Unit State Error: Normal Operation Error Flag (bit 0)\")\n", + " \n", + " for k in usStatus.keys():\n", + " if bits[-(k + 1)] == \"1\":\n", + " raise ValueError(f\"Unit State Error: {usStatus[k]} (bit {k})\")\n", + "\n", + "\n", + " #TODO: Check to see if the G9 switch is allowing high Voltage or not\n", + " # this function will need to be constantly sending requests/receiving to check when the high voltage is off/on\n", + " def checkStatus():\n", + " pass\n", + "\n", + " def flush_serial(self):\n", + " self.ser.reset_input_buffer()\n", + "\n", + "\n", + " #TODO: make funtion to turn all interlocks to red\n", + " def is_connected(self):\n", + " try:\n", + " #TODO: check if this works with G9 copied from Power Supply Driver\n", + " # Attempt to write a simple command to the device\n", + " self.ser.write(b'\\r') # Send a carriage return\n", + " # Try to read a response (there might not be one)\n", + " self.ser.read(1)\n", + "\n", + " self.isConnected = True\n", + " return True\n", + " except serial.SerialException:\n", + " return False\n", + " \n", + "\n", + " #TODO: Figure out how to handle all the errors (end task)\n", + " #TODO: add a function to keep track of the driver uptime\n" + ] + }, + { + "cell_type": "code", + "execution_count": 56, + "metadata": {}, + "outputs": [], + "source": [ + "_ = G9Driver()\n", + "\n", + "res = b'\\x40\\x00\\x00\\xc3\\x00\\x00\\xcb\\x00\\x00\\x00\\x00\\xff\\x0f\\x00\\x00\\x00\\x00$\\x00\\x00\\x00\\xff\\xff\\x0f\\x00\\x00\\x00\\xff\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x08\\x00\\x12\\x00\\x9a\\x08\\xac\\x14\\x00\\x0020000012X17M\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00@\\x00\\x00\\x00\\x00\\n\\n?\\x00\\x14\\xac?\\x00\\x14\\xac?\\x00\\x14\\xa0?\\x00\\x14\\xa0?\\x00\\x14\\xa0?\\x00\\x14\\xa0?\\x00\\x14\\xa0?\\x00\\x14\\x9a?\\x00\\x14\\x9a\\x13\\x00\\x14\\x9a\\x06\\x00\\x14\\x9a\\x01\\x00\\x14\\x9a\\x01\\x00\\x14\\x9a\\x06\\x00\\x14\\x9a\\x04\\x00\\x14\\x9a\\x01\\x00\\x14\\x9a\\x06\\x00\\x14p\\x01\\x00\\x14p\\x06\\x00\\x14p\\x06\\x00\\x14^\\x19\\xfe\\x2A\\x0D'" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "@\n", + "\\x00\\x00\n", + "\\xc3\n", + "\\x00\\x00\n", + "\\xcb\n", + "\\x00\\x00\\x00\\x00\\xff\\x0f\\x00\\x00\\x00\\x00$\\x00\\x00\\x00\\xff\\xff\\x0f\\x00\\x00\\x00\\xff\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x08\\x00\\x12\\x00\\x9a\\x08\\xac\\x14\\x00\\x0020000012X17M\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00@\\x00\\x00\\x00\\x00\\n\\n?\\x00\\x14\\xac?\\x00\\x14\\xac?\\x00\\x14\\xa0?\\x00\\x14\\xa0?\\x00\\x14\\xa0?\\x00\\x14\\xa0?\\x00\\x14\\xa0?\\x00\\x14\\x9a?\\x00\\x14\\x9a\\x13\\x00\\x14\\x9a\\x06\\x00\\x14\\x9a\\x01\\x00\\x14\\x9a\\x01\\x00\\x14\\x9a\\x06\\x00\\x14\\x9a\\x04\\x00\\x14\\x9a\\x01\\x00\\x14\\x9a\\x06\\x00\\x14p\\x01\\x00\\x14p\\x06\\x00\\x14p\\x06\\x00\\x14^\\x19\n", + "\\xfe\n", + "*\n", + "\n", + "Always \n", + "40\n", + "0000\n", + "Response length:\n", + "c3\n", + "End Code:\n", + "0000\n", + "Service Code:\n", + "cb\n", + "Data:\n", + "OCTD - \\x00\\x00\\x00\\x00\n", + "SITDF - \\xff\\x0f\\x00\\x00\\x00\\x00\n", + "SOTDF - $\\x00\\x00\\x00\n", + "SITSF - \\xff\\xff\\x0f\\x00\\x00\\x00\n", + "SOTSF - \\xff\\x00\\x00\\x00\n", + "Input causes - \\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\n", + "Output Causes - \\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\n", + "Reserved - \\x08\\x00\n", + "Unit Status - \\x12\\x00\n", + "ID - \\x9a\\x08\n", + "Unit Conduct time - \\xac\\x14\\x00\\x00\n", + "Reserve2 - 20000012X17M\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\n", + "Persent err - \\x00\\x00\\x00\\x00\\x00\\x00\\x00@\\x00\\x00\\x00\\x00\n", + "err cnt - \\n\n", + "ope cnt - \\n\n", + "Error - ?\\x00\\x14\\xac?\\x00\\x14\\xac?\\x00\\x14\\xa0?\\x00\\x14\\xa0?\\x00\\x14\\xa0?\\x00\\x14\\xa0?\\x00\\x14\\xa0?\\x00\\x14\\x9a?\\x00\\x14\\x9a\\x13\\x00\\x14\\x9a\n", + "Operation - \\x06\\x00\\x14\\x9a\\x01\\x00\\x14\\x9a\\x01\\x00\\x14\\x9a\\x06\\x00\\x14\\x9a\\x04\\x00\\x14\\x9a\\x01\\x00\\x14\\x9a\\x06\\x00\\x14p\\x01\\x00\\x14p\\x06\\x00\\x14p\\x06\\x00\\x14^\n", + "\n", + "\n", + "Check Sum:\n", + "19fe\n", + "Always\n", + "2a" + ] + }, + { + "cell_type": "code", + "execution_count": 48, + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "b'@\\x00\\x00\\xc3\\x00\\x00\\xcb\\x00\\x00\\x00\\x00\\xff\\x0f\\x00\\x00\\x00\\x00$\\x00\\x00\\x00\\xff\\xff\\x0f\\x00\\x00\\x00\\xff\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x08\\x00\\x12\\x00\\x9a\\x08\\xac\\x14\\x00\\x0020000012X17M\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00@\\x00\\x00\\x00\\x00\\n\\n?\\x00\\x14\\xac?\\x00\\x14\\xac?\\x00\\x14\\xa0?\\x00\\x14\\xa0?\\x00\\x14\\xa0?\\x00\\x14\\xa0?\\x00\\x14\\xa0?\\x00\\x14\\x9a?\\x00\\x14\\x9a\\x13\\x00\\x14\\x9a\\x06\\x00\\x14\\x9a\\x01\\x00\\x14\\x9a\\x01\\x00\\x14\\x9a\\x06\\x00\\x14\\x9a\\x04\\x00\\x14\\x9a\\x01\\x00\\x14\\x9a\\x06\\x00\\x14p\\x01\\x00\\x14p\\x06\\x00\\x14p\\x06\\x00\\x14^\\x19\\xfe*\\r'\n" + ] + } + ], + "source": [ + "\n", + "_.lastResponse = res\n", + "len(res)\n", + "print(_.lastResponse)" + ] + }, + { + "cell_type": "code", + "execution_count": 54, + "metadata": {}, + "outputs": [ + { + "data": { + "text/plain": [ + "b'\\xfc'" + ] + }, + "execution_count": 54, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [ + "header = b'\\x40\\x00\\x00\\x0F\\x4B\\x03\\x4D\\x00\\x01' # could also use bytes.fromhex() method in future for simplicity\n", + "data = b'\\x00\\x00\\x00\\x11\\x00\\x00'\n", + "\n", + "cs = _.calculate_checksum(header + data, 0 , len(header + data))\n", + "cs" + ] + }, + { + "cell_type": "code", + "execution_count": 42, + "metadata": {}, + "outputs": [ + { + "ename": "AttributeError", + "evalue": "module 'serial' has no attribute 'SerialException'", + "output_type": "error", + "traceback": [ + "\u001b[1;31m---------------------------------------------------------------------------\u001b[0m", + "\u001b[1;31mAttributeError\u001b[0m Traceback (most recent call last)", + "Cell \u001b[1;32mIn[39], line 277\u001b[0m, in \u001b[0;36mG9Driver.is_connected\u001b[1;34m(self)\u001b[0m\n\u001b[0;32m 274\u001b[0m \u001b[38;5;28;01mtry\u001b[39;00m:\n\u001b[0;32m 275\u001b[0m \u001b[38;5;66;03m#TODO: check if this works with G9 copied from Power Supply Driver\u001b[39;00m\n\u001b[0;32m 276\u001b[0m \u001b[38;5;66;03m# Attempt to write a simple command to the device\u001b[39;00m\n\u001b[1;32m--> 277\u001b[0m \u001b[38;5;28;43mself\u001b[39;49m\u001b[38;5;241;43m.\u001b[39;49m\u001b[43mser\u001b[49m\u001b[38;5;241m.\u001b[39mwrite(\u001b[38;5;124mb\u001b[39m\u001b[38;5;124m'\u001b[39m\u001b[38;5;130;01m\\r\u001b[39;00m\u001b[38;5;124m'\u001b[39m) \u001b[38;5;66;03m# Send a carriage return\u001b[39;00m\n\u001b[0;32m 278\u001b[0m \u001b[38;5;66;03m# Try to read a response (there might not be one)\u001b[39;00m\n", + "\u001b[1;31mAttributeError\u001b[0m: 'G9Driver' object has no attribute 'ser'", + "\nDuring handling of the above exception, another exception occurred:\n", + "\u001b[1;31mAttributeError\u001b[0m Traceback (most recent call last)", + "Cell \u001b[1;32mIn[42], line 2\u001b[0m\n\u001b[0;32m 1\u001b[0m \u001b[38;5;66;03m# _.response()\u001b[39;00m\n\u001b[1;32m----> 2\u001b[0m \u001b[43m_\u001b[49m\u001b[38;5;241;43m.\u001b[39;49m\u001b[43msendCommand\u001b[49m\u001b[43m(\u001b[49m\u001b[43m)\u001b[49m\n", + "Cell \u001b[1;32mIn[39], line 66\u001b[0m, in \u001b[0;36mG9Driver.sendCommand\u001b[1;34m(self)\u001b[0m\n\u001b[0;32m 64\u001b[0m \u001b[38;5;28;01mdef\u001b[39;00m \u001b[38;5;21msendCommand\u001b[39m(\u001b[38;5;28mself\u001b[39m):\n\u001b[0;32m 65\u001b[0m \u001b[38;5;66;03m# TODO: frontend topic : decided how we want to display the exception\u001b[39;00m\n\u001b[1;32m---> 66\u001b[0m \u001b[38;5;28;01mif\u001b[39;00m \u001b[38;5;129;01mnot\u001b[39;00m \u001b[38;5;28;43mself\u001b[39;49m\u001b[38;5;241;43m.\u001b[39;49m\u001b[43mis_connected\u001b[49m\u001b[43m(\u001b[49m\u001b[43m)\u001b[49m:\n\u001b[0;32m 67\u001b[0m \u001b[38;5;28;01mraise\u001b[39;00m \u001b[38;5;167;01mConnectionError\u001b[39;00m(\u001b[38;5;124m\"\u001b[39m\u001b[38;5;124mSeiral Port is Not Open.\u001b[39m\u001b[38;5;124m\"\u001b[39m)\n\u001b[0;32m 68\u001b[0m header \u001b[38;5;241m=\u001b[39m \u001b[38;5;124mb\u001b[39m\u001b[38;5;124m'\u001b[39m\u001b[38;5;130;01m\\x40\u001b[39;00m\u001b[38;5;130;01m\\x00\u001b[39;00m\u001b[38;5;130;01m\\x00\u001b[39;00m\u001b[38;5;130;01m\\x0F\u001b[39;00m\u001b[38;5;130;01m\\x4B\u001b[39;00m\u001b[38;5;130;01m\\x03\u001b[39;00m\u001b[38;5;130;01m\\x4D\u001b[39;00m\u001b[38;5;130;01m\\x00\u001b[39;00m\u001b[38;5;130;01m\\x01\u001b[39;00m\u001b[38;5;124m'\u001b[39m \u001b[38;5;66;03m# could also use bytes.fromhex() method in future for simplicity\u001b[39;00m\n", + "Cell \u001b[1;32mIn[39], line 283\u001b[0m, in \u001b[0;36mG9Driver.is_connected\u001b[1;34m(self)\u001b[0m\n\u001b[0;32m 281\u001b[0m \u001b[38;5;28mself\u001b[39m\u001b[38;5;241m.\u001b[39misConnected \u001b[38;5;241m=\u001b[39m \u001b[38;5;28;01mTrue\u001b[39;00m\n\u001b[0;32m 282\u001b[0m \u001b[38;5;28;01mreturn\u001b[39;00m \u001b[38;5;28;01mTrue\u001b[39;00m\n\u001b[1;32m--> 283\u001b[0m \u001b[38;5;28;01mexcept\u001b[39;00m \u001b[43mserial\u001b[49m\u001b[38;5;241;43m.\u001b[39;49m\u001b[43mSerialException\u001b[49m:\n\u001b[0;32m 284\u001b[0m \u001b[38;5;28;01mreturn\u001b[39;00m \u001b[38;5;28;01mFalse\u001b[39;00m\n", + "\u001b[1;31mAttributeError\u001b[0m: module 'serial' has no attribute 'SerialException'" + ] + } + ], + "source": [ + "\n", + "# _.response()\n", + "_.sendCommand()\n" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "temp = b'?\\x00\\x14\\xac?\\x00\\x14\\xac?\\x00\\x14\\xa0?\\x00\\x14\\xa0?\\x00\\x14\\xa0?\\x00\\x14\\xa0?\\x00\\x14\\xa0?\\x00\\x14\\x9a?\\x00\\x14\\x9a\\x13\\x00\\x14\\x9a\\x06\\x00\\x14\\x9a\\x01\\x00\\x14\\x9a\\x01\\x00\\x14\\x9a\\x06\\x00\\x14\\x9a\\x04\\x00\\x14\\x9a\\x01\\x00\\x14\\x9a\\x06\\x00\\x14p\\x01\\x00\\x14p\\x06\\x00\\x14p\\x06\\x00\\x14^'\n", + "temp[40:]" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "len(b'\\x06\\x00\\x14\\x9a\\x01\\x00\\x14\\x9a\\x01\\x00\\x14\\x9a\\x06\\x00\\x14\\x9a\\x04\\x00\\x14\\x9a\\x01\\x00\\x14\\x9a\\x06\\x00\\x14p\\x01\\x00\\x14p\\x06\\x00\\x14p\\x06\\x00\\x14^')" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "_ = res[21:27]\n", + "len(_) == 6" + ] + }, + { + "cell_type": "code", + "execution_count": 17, + "metadata": {}, + "outputs": [], + "source": [] + }, + { + "cell_type": "code", + "execution_count": 60, + "metadata": {}, + "outputs": [], + "source": [ + "usStatus = {\n", + " 9: \"Output Power Supply Error Flag\",\n", + " 10: \"Safety I/O Terminal Error Flag\",\n", + " 13: \"Function Block Error Flag\"\n", + "}\n", + "\n", + "\n", + "# rn am hoping that only one of the error flags can be set at a time\n", + "def unit_state_error(data):\n", + " if len(data) != 2:\n", + " raise ValueError(f\"Expected 2 bytes, but received {len(data)}.\")\n", + " \n", + " bits = bytes_to_binary(data)\n", + " print(bits, \" \", bits[-1])\n", + "\n", + " if bits[-1] == \"0\":\n", + " raise ValueError(f\"Unit State Error: Normal Operation Error Flag (bit 0)\")\n", + " \n", + " for k in usStatus.keys():\n", + " if bits[-(k + 1)] == \"1\":\n", + " raise ValueError(f\"Unit State Error: {usStatus[k]} (bit {k})\")\n", + "\n", + " \n", + "\n", + " \n", + " # # if er in usStatus:\n", + " # # raise ValueError(f\"Unit State Error: {usStatus[er]} (code {er})\")\n", + " # return True\n", + "\n", + "# helper function to convert bytes to bits for checking flags\n", + "# not currently being used but many be helpful in the future for getting errors\n", + "def bytes_to_binary(byte_string):\n", + " return ''.join(format(byte, '08b') for byte in byte_string)\n", + "\n", + "\n" + ] + }, + { + "cell_type": "code", + "execution_count": 61, + "metadata": {}, + "outputs": [ + { + "data": { + "text/plain": [ + "'1'" + ] + }, + "execution_count": 61, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [ + "bytes_to_binary(b'\\x00\\x01')[-1]" + ] + }, + { + "cell_type": "code", + "execution_count": 62, + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "0000000000000001 1\n" + ] + } + ], + "source": [ + "okstat = b'\\x00\\x01'\n", + "unit_state_error(okstat)\n" + ] + }, + { + "cell_type": "code", + "execution_count": 63, + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "0000000000000000 0\n", + "Unit State Error: Normal Operation Error Flag (bit 0)\n" + ] + } + ], + "source": [ + "notstat = b'\\x00\\x00'\n", + "try:\n", + " unit_state_error(notstat)\n", + "except ValueError as e:\n", + " print(e)" + ] + }, + { + "cell_type": "code", + "execution_count": 64, + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "0000001000000001 1\n", + "Unit State Error: Output Power Supply Error Flag (bit 9)\n" + ] + } + ], + "source": [ + "notstat2 = b'\\x02\\x01'\n", + "try:\n", + " unit_state_error(notstat2)\n", + "except ValueError as e:\n", + " print(e)" + ] + }, + { + "cell_type": "code", + "execution_count": 65, + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "0000010000000001 1\n", + "Unit State Error: Safety I/O Terminal Error Flag (bit 10)\n" + ] + } + ], + "source": [ + "notstat2 = b'\\x04\\x01'\n", + "try:\n", + " unit_state_error(notstat2)\n", + "except ValueError as e:\n", + " print(e)" + ] + }, + { + "cell_type": "code", + "execution_count": 66, + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "0010000000000001 1\n", + "Unit State Error: Function Block Error Flag (bit 13)\n" + ] + } + ], + "source": [ + "notstat2 = b'\\x20\\x01'\n", + "try:\n", + " unit_state_error(notstat2)\n", + "except ValueError as e:\n", + " print(e)" + ] + }, + { + "cell_type": "code", + "execution_count": 67, + "metadata": {}, + "outputs": [ + { + "data": { + "text/plain": [ + "'0000000000000001'" + ] + }, + "execution_count": 67, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [ + "bytes_to_binary(b'\\x00\\x01')" + ] + }, + { + "cell_type": "code", + "execution_count": 69, + "metadata": {}, + "outputs": [ + { + "data": { + "text/plain": [ + "[0, 1, 3, 4, 5, 6, 7, 8, 9, 10, 11]" + ] + }, + "execution_count": 69, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [ + "bits = []\n", + "for i in range(12):\n", + " if \"0\" == bytes_to_binary(b'\\x00\\x01')[-i+1]:\n", + " bits.append(i)\n", + "\n", + "bits" + ] + }, + { + "cell_type": "code", + "execution_count": 74, + "metadata": {}, + "outputs": [ + { + "data": { + "text/plain": [ + "True" + ] + }, + "execution_count": 74, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [ + "US = res[73:75]\n", + "US != 1" + ] + }, + { + "cell_type": "code", + "execution_count": 75, + "metadata": {}, + "outputs": [ + { + "data": { + "text/plain": [ + "b'\\x12\\x00'" + ] + }, + "execution_count": 75, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [ + "US" + ] + }, + { + "cell_type": "code", + "execution_count": 82, + "metadata": {}, + "outputs": [ + { + "data": { + "text/plain": [ + "True" + ] + }, + "execution_count": 82, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [ + "b'\\x00\\x01' == b'\\x00\\x01'" + ] + }, + { + "cell_type": "code", + "execution_count": 91, + "metadata": {}, + "outputs": [], + "source": [ + "lengthTest = b'@\\x00\\x00\\xc3\\x00\\x00\\xcb\\x00\\x00\\x00\\x00\\x00\\x08\\x00\\x00\\x00\\x00C\\x00\\x00\\x00\\xff\\xff\\x0f\\x00\\x00\\x00\\xff\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x08\\x00\\x13\\x00\\x9a\\x08\\xbe\\x14\\x00\\x0020000012X17M\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\n\\n?\\x00\\x14\\xb8?\\x00\\x14\\xb8?\\x00\\x14\\xb8?\\x00\\x14\\xb8?\\x00\\x14\\xb2?\\x00\\x14\\xac?\\x00\\x14\\xac?\\x00\\x14\\xac?\\x00\\x14\\xa0?\\x00\\x14\\xa0\\x06\\x00\\x14\\xb2\\x01\\x00\\x14\\xb2\\x01\\x00\\x14\\xb2\\x01\\x00\\x14\\xb2\\x06\\x00\\x14\\xb2\\x01\\x00\\x14\\xb2\\x06\\x00\\x14\\x9a\\x01\\x00\\x14\\x9a\\x01\\x00\\x14\\x9a\\x06\\x00\\x14\\x9a\\x1a\\xe8*\\r'" + ] + }, + { + "cell_type": "code", + "execution_count": 92, + "metadata": {}, + "outputs": [ + { + "data": { + "text/plain": [ + "195" + ] + }, + "execution_count": 92, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [ + "lengthTest[3]" + ] + }, + { + "cell_type": "code", + "execution_count": 95, + "metadata": {}, + "outputs": [ + { + "data": { + "text/plain": [ + "b'@'" + ] + }, + "execution_count": 95, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [ + "b'\\x40'" + ] + }, + { + "cell_type": "code", + "execution_count": 99, + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "b'\\x00\\x08\\x00\\x00\\x00\\x00'\n", + "b'\\x00\\x00\\x00'\n" + ] + } + ], + "source": [ + "_ = lengthTest[11:17]\n", + "print(_)\n", + "print(_[-3:])" + ] + }, + { + "cell_type": "code", + "execution_count": 102, + "metadata": {}, + "outputs": [ + { + "data": { + "text/plain": [ + "10" + ] + }, + "execution_count": 102, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [ + "len(lengthTest[31:55][-10:])" + ] + } + ], + "metadata": { + "kernelspec": { + "display_name": "Python 3", + "language": "python", + "name": "python3" + }, + "language_info": { + "codemirror_mode": { + "name": "ipython", + "version": 3 + }, + "file_extension": ".py", + "mimetype": "text/x-python", + "name": "python", + "nbconvert_exporter": "python", + "pygments_lexer": "ipython3", + "version": "3.12.4" + } + }, + "nbformat": 4, + "nbformat_minor": 2 +} diff --git a/scripts/__init__.py b/scripts/__init__.py new file mode 100644 index 00000000..79f3b5d2 --- /dev/null +++ b/scripts/__init__.py @@ -0,0 +1,12 @@ +# __init__.py +from .g9_driver import G9Driver +from .errorTests import TestG9Driver + +from .g9_driver import G9Driver # Updated relative import + +# Running the tests (create scriptsG9Drivertestrun_tests.py) +import unittest +from errorTests import TestG9Driver + +if __name__ == '__main__': + unittest.main() \ No newline at end of file diff --git a/scripts/test.py b/scripts/test.py new file mode 100644 index 00000000..040666b4 --- /dev/null +++ b/scripts/test.py @@ -0,0 +1,69 @@ +import serial +import time + +ser = serial.Serial("COM11", 9600, parity=serial.PARITY_EVEN, stopbits=serial.STOPBITS_ONE, bytesize=serial.EIGHTBITS, timeout=1) +# save data method +def saveData(data, Testnum): + data = str(data) + # print(data) + with open(f'test{Testnum}.txt', "w") as f: + # this works if it is in hex with or without leading 0x + # binary = bin(int(data, base=16))[2:] + f.write(data) + +def saveBytes(bytes, Testnum): + with open(f'bytes{Testnum}.txt', "wb") as f: + # this works if it is in hex with or without leading 0x + f.write(bytes) + + +always = b"0x4000000F4B034D0001" +# sum 00EB +data = b"00000000" +reserve = b"0000" +checkSum = b"00EB" +always2 = b"2A0D" + +# message = b"4000000F4B034D000100000000000000EB2A0D" + +message = b'\x40\x00\x00\x0F\x4B\x03\x4D\x00\x01\x00\x00\x00\x00\x00\x00\x12\x49\x2A\x0D' +msg2 = b'\x40\x00\x00\x0F\x4B\x03\x4D\x00\x01\x00\x00\x00\x00\x00\x80\x01\x6B\x2A\x0D' +msg3 = b'\x40\x00\x00\x0F\x4B\x03\x4D\x00\x01\xFF\xFF\xFF\xFF\x00\x00\x04\xE7\x2A\x0D' +# msg4 = b'\x21\x3a\xe3\x87\x40' + +message_split = [ + 0x40, #Start code + 0x00, + 0x00, + 0x0F, + 0x4B, + 0x03, + 0x3D, + 0X00, + 0X01, + 0x00, 0x00, 0x00, 0x00, # Optional Data + 0x00, 0x00, #Reserved bytes + 0x00, 0x00EB, + 0x2A, + 0X0D +] + + + +# print(ser.read(size=10)) +_ = ser.write(message) +print(message) +print(_) + +#data = ser.read_until(b'\x0D') +data = ser.read(size=198) + +#print(ser.read(size=10)) + +ser.close() + + +print(data) + + +saveData(data, 2) \ No newline at end of file diff --git a/subsystem/interlocks.py b/subsystem/interlocks.py index dc58e489..6c9171df 100644 --- a/subsystem/interlocks.py +++ b/subsystem/interlocks.py @@ -1,60 +1,260 @@ # interlocks.py import tkinter as tk import os, sys +import instrumentctl.g9_driver as g9_driv +from utils import LogLevel +import time -def resource_path(relative_path): - """ Get the absolute path to a resource, works for development and when running as bundled executable""" - try: - # PyInstaller creates a temp folder and stores path in _MEIPASS - base_path = sys._MEIPASS - except AttributeError: - base_path = os.path.abspath(".") +class InterlocksSubsystem: + # the bit poistion for each interlock + INPUTS = { + 0 : "E-STOP Int", # Chassis Estop + 1 : "E-STOP Int", # Chassis Estop + 2 : "E-STOP Ext", # Peripheral Estop + 3 : "E-STOP Ext", # Peripheral Estop + 4 : "Door", # Door + 5 : "Door", # Door Lock + 6 : "Vacuum Power", # Vacuum Power + 7 : "Vacuum Pressure", # Vacuum Pressure + 8 : "High Oil", # Oil High + 9 : "Low Oil", # Oil Low + 10 : "Water", # Water + 11 : "HVolt ON", # HVolt ON + 12 : "G9SP Active" # G9SP Active + } - return os.path.join(base_path, relative_path) + INDICATORS = { + 'Door': None, + 'Water': None, + 'Vacuum Power': None, + 'Vacuum Pressure': None, + 'Low Oil': None, + 'High Oil': None, + 'E-STOP Int': None, + 'E-STOP Ext': None, + 'All Interlocks': None, + 'G9SP Active': None, + 'HVolt ON': None + } -class InterlocksSubsystem: - def __init__(self, parent, logger=None): + def __init__(self, parent, com_ports, logger=None, frames=None): self.parent = parent self.logger = logger - self.interlock_status = { - "Vacuum": True, "Water": False, "Door": False, "Timer": True, - "Oil High": False, "Oil Low": False, "E-stop Ext": True, - "E-stop Int": True, "G9SP Active": True - } + self.frames = frames + self.last_error_time = 0 # Track last error time + self.error_count = 0 # Track consecutive errors + self.update_interval = 500 # Default update interval (ms) + self.max_interval = 5000 # Maximum update interval (ms) + self._last_status = None self.setup_gui() + try: + if com_ports is not None: # Better comparison + try: + self.driver = g9_driv.G9Driver(com_ports, logger=self.logger) + self.log("G9 driver initialized", LogLevel.INFO) + except Exception as e: + self.log(f"Failed to connect: {e}", LogLevel.ERROR) + self._set_all_indicators('red') + else: + self.driver = None + self.log("No COM port provided for G9 driver", LogLevel.WARNING) + self._set_all_indicators('red') + except Exception as e: + self.driver = None + self.log(f"Failed to initialize G9 driver: {str(e)}", LogLevel.WARNING) + self._set_all_indicators('red') + + self.parent.after(self.update_interval, self.update_data) + + def update_com_port(self, com_port): + """ + Update the COM port and reinitialize the driver + + Catch: + Expection: If inilizition throws an error + """ + if com_port: + try: + new_driver = g9_driv.G9Driver(com_port, logger=self.logger) + # Test connection by getting status + new_driver.get_interlock_status() + self.driver = new_driver + self.log(f"G9 driver updated to port {com_port}", LogLevel.INFO) + except Exception as e: + self.log(f"Failed to update G9 driver: {str(e)}", LogLevel.ERROR) + self._set_all_indicators('red') + else: + self._set_all_indicators('red') + self.log("update_com_port is being called without a com port", LogLevel.ERROR) + + def _adjust_update_interval(self, success=True): + """Adjust the polling interval based on connection success/failure""" + if success: + # On success, return to normal update rate + self.error_count = 0 + self.update_interval = 500 # Reset to default interval + else: + # On communication failure, use exponential backoff with a cap + self.error_count = min(self.error_count + 1, 5) # Cap error count + self.update_interval = min(500 * (2 ** self.error_count), self.max_interval) + def setup_gui(self): + """Setup the GUI for the interlocks subsystem""" + self._create_main_frame() + interlocks_frame = self._create_interlocks_frame() + self._create_indicators(interlocks_frame) + + def _create_main_frame(self): + """Create and configure the main container frame""" self.interlocks_frame = tk.Frame(self.parent) self.interlocks_frame.pack(fill=tk.BOTH, expand=True) + + # Configure grid weights for responsive layout + self.parent.grid_rowconfigure(0, weight=1) + self.parent.grid_columnconfigure(0, weight=1) + self.interlocks_frame.grid_rowconfigure(0, weight=1) + self.interlocks_frame.grid_columnconfigure(0, weight=1) + self.interlocks_frame.grid(row=0, column=0, sticky='nsew') + + def _create_interlocks_frame(self): + """Create the frame that will contain the interlock indicators""" + interlocks_frame = tk.Frame(self.interlocks_frame, highlightbackground="black") + interlocks_frame.grid(row=0, column=0, padx=0, pady=0, sticky="nsew") + + # Configure columns for indicator pairs (label + light) + num_columns = 22 + for i in range(num_columns): + interlocks_frame.grid_columnconfigure(i, weight=1) + + return interlocks_frame + + def _create_indicator_circle(self, frame, color): + """Create a circular indicator light""" + canvas = tk.Canvas(frame, width=30, height=30, highlightthickness=0) + canvas.grid(sticky='nsew') + oval_id = canvas.create_oval(5, 5, 25, 25, fill=color, outline="black") + return canvas, oval_id + + def _create_indicators(self, frame): + """Create all indicator lights and their labels""" + for i, (name, _) in enumerate(self.INDICATORS.items()): + # Create label + tk.Label(frame, text=name, anchor="center").grid( + row=0, column=i*2, sticky='ew' + ) + + # Create indicator light + canvas, oval_id = self._create_indicator_circle(frame, 'red') + canvas.grid(row=0, column=i*2+1, sticky='nsew') + self.INDICATORS[name] = (canvas, oval_id) + + # updates indicator and logs updates + def update_interlock(self, name, safety, data): + """Update individual interlock indicator""" + if name not in self.INDICATORS or safety == None or data == None: + self.log("Invalid inputs to update_interlock", LogLevel.ERROR) + + color = 'green' if (safety & data) == 1 else 'red' + + if name in self.INDICATORS: + canvas, oval_id = self.INDICATORS[name] + current_color = canvas.itemcget(oval_id, 'fill') + if current_color != color: + canvas.itemconfig(oval_id, fill=color) + self.log(f"Interlock {name}: {current_color} -> {color}", LogLevel.INFO) + + def _set_all_indicators(self, color): + """Set all indicators to specified color""" + if color == None or color == "": + self.log("Invalid inputs to _set_all_indicators", LogLevel.ERROR) + + if self.INDICATORS: + for name in self.INDICATORS: + canvas, oval_id = self.INDICATORS[name] + current_color = canvas.itemcget(oval_id, 'fill') + if current_color != color: + canvas.itemconfig(oval_id, fill=color) + self.log(f"Interlock {name}: {current_color} -> {color}", LogLevel.INFO) + + def update_data(self): + """ + Update interlock status + + Finally: Will always schedule the next time to refresh data + + Catch: + ConnectionError: Thrown from G9Driver when serial connection throws error + ValueError: Thrown from G9Driver when unexpected responce is recieved + + Exception: If anything else in message process throws an error + + """ + current_time = time.time() + try: + if not self.driver or not self.driver.is_connected(): + if current_time - self.last_error_time > (self.update_interval / 1000): + self._set_all_indicators('red') + self.log("G9 driver not connected", LogLevel.WARNING) + self.last_error_time = current_time + self.last_error_time = time.time() + self._adjust_update_interval(success=False) + else: + # Get interlock status from driver + status = self.driver.get_interlock_status() + + if status is None: + self._set_all_indicators('red') + if current_time - self.last_error_time > (self.update_interval / 1000): + self.log("No data available from G9", LogLevel.CRITICAL) + self.last_error_time = current_time + self._adjust_update_interval(success=False) + self.parent.after(self.update_interval, self.update_data) + return + + sitsf_bits, sitdf_bits, g9_active = status + + # Process dual-input interlocks (first 3 pairs) + for i in range(3): + safety = (sitsf_bits[i*2] & + sitsf_bits[i*2+1]) + data = (sitdf_bits[i*2] & + sitdf_bits[i*2+1]) + + self.update_interlock(self.INPUTS[i*2], safety, data) + + # Process single-input interlocks + for i in range(6, 13): + safety = sitsf_bits[i] + data = sitdf_bits[i] + self.update_interlock(self.INPUTS[i], safety, data) + + # Update overall status + all_good = sitsf_bits[:12] == sitdf_bits[:12] == [1] * 12 + self.update_interlock("All Interlocks", True, all_good) + + # make sure that the data output indicates button and been pressed and the input is not off/error + if g9_active == sitsf_bits[12]: + self.update_interlock("G9SP Active", True, all_good) + + self._adjust_update_interval(success=True) + + except Exception as e: + if time.time() - self.last_error_time > (self.update_interval / 1000): + self.log(f"Unexpected error: {str(e)}", LogLevel.ERROR) + self._set_all_indicators('red') + self.last_error_time = time.time() + self._adjust_update_interval(success=False) + + finally: + # Schedule next update + if self.driver: + self.parent.after(self.update_interval, self.update_data) + + def log(self, message, level=LogLevel.INFO): + """Log a message with the specified level if a logger is configured.""" + if self.logger: + self.logger.log(message, level) + else: + print(f"{level.name}: {message}") - interlock_labels = [ - "Vacuum", "Water", "Door", "Timer", "Oil High", - "Oil Low", "E-stop Ext", "E-stop Int", "G9SP Active" - ] - self.indicators = { - 'active': tk.PhotoImage(file=resource_path("media/off_orange.png")), - 'inactive': tk.PhotoImage(file=resource_path("media/on.png")) - } - - for label in interlock_labels: - frame = tk.Frame(self.interlocks_frame) - frame.pack(side=tk.LEFT, expand=True, padx=5) - - lbl = tk.Label(frame, text=label, font=("Helvetica", 8)) - lbl.pack(side=tk.LEFT) - status = self.interlock_status[label] - indicator = tk.Label(frame, image=self.indicators['active'] if status else self.indicators['inactive']) - indicator.pack(side=tk.RIGHT, pady=1) - frame.indicator = indicator # Store reference to the indicator for future updates - - def update_interlock(self, name, status): - if name in self.parent.children: - frame = self.parent.children[name] - indicator = frame.indicator - new_image = self.indicators['active'] if status else self.indicators['inactive'] - indicator.config(image=new_image) - indicator.image = new_image # Keep a reference - - def update_pressure_dependent_locks(self, pressure): - # Disable the Vacuum lock if pressure is below 2 mbar - self.update_interlock("Vacuum", pressure >= 2) \ No newline at end of file diff --git a/subsystem/interlocks_tester.ipynb b/subsystem/interlocks_tester.ipynb new file mode 100644 index 00000000..e69de29b diff --git a/usr/Agilent_33120manual.pdf b/usr/Agilent_33120manual.pdf deleted file mode 100644 index 0726cb08..00000000 Binary files a/usr/Agilent_33120manual.pdf and /dev/null differ diff --git a/usr/DOC-MANUAL-MC-Apex.pdf b/usr/DOC-MANUAL-MC-Apex.pdf deleted file mode 100644 index df4e6773..00000000 Binary files a/usr/DOC-MANUAL-MC-Apex.pdf and /dev/null differ diff --git a/usr/__pycache__/panel_config.cpython-312.pyc b/usr/__pycache__/panel_config.cpython-312.pyc deleted file mode 100644 index d05fde4f..00000000 Binary files a/usr/__pycache__/panel_config.cpython-312.pyc and /dev/null differ