Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Interlocks Subsystem and G9 Driver Cleanup #27

Open
wants to merge 9 commits into
base: develop
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 4 additions & 2 deletions instrumentctl/g9_driver.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ class G9Driver:
CHECKSUM_HIGH = 195 # G9 Response Checksum
CHECKSUM_LOW = 196 # G9 Response Checksum

# Status dictionaries
# the bit poistion for each interlock
IN_STATUS = {
0: "No error",
1: "Invalid configuration",
Expand Down Expand Up @@ -58,6 +58,7 @@ class G9Driver:
}

def __init__(self, port=None, baudrate=9600, timeout=0.5, logger=None, debug_mode=False):

self.logger = logger
self.debug_mode = debug_mode
self._setup_serial(port, baudrate, timeout)
Expand Down Expand Up @@ -223,7 +224,7 @@ def _validate_response_format(self, data):
Validate basic response format

Raise:
ValueError: if formate is not as expected
ValueError: if format is not as expected
"""
if data == None:
raise ValueError("Invalid inputs to _validate_response_format: Data is None")
Expand Down Expand Up @@ -258,6 +259,7 @@ def _validate_checksum(self, data):
Raise:
ValueError: Calculated check sum does not match
"""

if data == None:
raise ValueError("Invalid inputs to _validate_checksum: Data is None")

Expand Down
126 changes: 82 additions & 44 deletions subsystem/interlocks.py
Original file line number Diff line number Diff line change
@@ -1,26 +1,27 @@
# interlocks.py
import tkinter as tk
import os, sys
import instrumentctl.g9_driver as g9_driv
from utils import LogLevel
import time

class InterlocksSubsystem:
# the bit poistion for each interlock
""" Manages the interlock subsystem interface and status monitoring """

# Bit position mappings for each interlock
INPUTS = {
0 : "E-STOP Int", # Chassis Estop
1 : "E-STOP Int", # Chassis Estop
2 : "E-STOP Ext", # Peripheral Estop
3 : "E-STOP Ext", # Peripheral Estop
4 : "Door", # Door
5 : "Door", # Door Lock
6 : "Vacuum Power", # Vacuum Power
0 : "E-STOP Int", # Chassis Estop
1 : "E-STOP Int", # Chassis Estop
2 : "E-STOP Ext", # Peripheral Estop
3 : "E-STOP Ext", # Peripheral Estop
4 : "Door", # Door
5 : "Door", # Door Lock
6 : "Vacuum Power", # Vacuum Power
7 : "Vacuum Pressure", # Vacuum Pressure
8 : "High Oil", # Oil High
9 : "Low Oil", # Oil Low
10 : "Water", # Water
11 : "HVolt ON", # HVolt ON
12 : "G9SP Active" # G9SP Active
8 : "High Oil", # Oil High
9 : "Low Oil", # Oil Low
10 : "Water", # Water
11 : "HVolt ON", # HVolt ON
12 : "G9SP Active" # G9SP Active
}

INDICATORS = {
Expand All @@ -41,13 +42,16 @@ def __init__(self, parent, com_ports, logger=None, frames=None):
self.parent = parent
self.logger = logger
self.frames = frames
self.last_error_time = 0 # Track last error time
self.error_count = 0 # Track consecutive errors
self.last_error_time = 0 # Track last error time
self.error_count = 0 # Track consecutive errors
self.update_interval = 500 # Default update interval (ms)
self.max_interval = 5000 # Maximum update interval (ms)
self.max_interval = 5000 # Maximum update interval (ms)
self._last_status = None
self.setup_gui()
self._initialize_driver(com_ports)
self.parent.after(self.update_interval, self.update_data)

def _initialize_driver(self, com_ports):
try:
if com_ports is not None: # Better comparison
try:
Expand All @@ -64,15 +68,16 @@ def __init__(self, parent, com_ports, logger=None, frames=None):
self.driver = None
self.log(f"Failed to initialize G9 driver: {str(e)}", LogLevel.WARNING)
self._set_all_indicators('red')

self.parent.after(self.update_interval, self.update_data)

def update_com_port(self, com_port):
"""
Update the COM port and reinitialize the driver

Args:
com_port: New port to use

Catch:
Expection: If inilizition throws an error
Raises:
Exception: If initialization fails
"""
if com_port:
try:
Expand All @@ -86,18 +91,29 @@ def update_com_port(self, com_port):
self._set_all_indicators('red')
else:
self._set_all_indicators('red')
self.log("update_com_port is being called without a com port", LogLevel.ERROR)
self.log(
"update_com_port is being called without a com port",
LogLevel.ERROR
)

def _adjust_update_interval(self, success=True):
"""Adjust the polling interval based on connection success/failure"""
"""
Adjust the polling interval based on connection success/failure

Args:
success: Boolean indicating if the last update was successful
"""
if success:
# On success, return to normal update rate
self.error_count = 0
self.update_interval = 500 # Reset to default interval
else:
# On communication failure, use exponential backoff with a cap
self.error_count = min(self.error_count + 1, 5) # Cap error count
self.update_interval = min(500 * (2 ** self.error_count), self.max_interval)
self.update_interval = min(
500 * (2 ** self.error_count),
self.max_interval
)

def setup_gui(self):
"""Setup the GUI for the interlocks subsystem"""
Expand Down Expand Up @@ -130,7 +146,16 @@ def _create_interlocks_frame(self):
return interlocks_frame

def _create_indicator_circle(self, frame, color):
"""Create a circular indicator light"""
"""
Create a circular indicator light

Args:
frame: Parent frame for the indicator
color: Initial color of the indicator

Returns:
tuple: (canvas, oval_id) for the created indicator
"""
canvas = tk.Canvas(frame, width=30, height=30, highlightthickness=0)
canvas.grid(sticky='nsew')
oval_id = canvas.create_oval(5, 5, 25, 25, fill=color, outline="black")
Expand All @@ -151,8 +176,15 @@ def _create_indicators(self, frame):

# updates indicator and logs updates
def update_interlock(self, name, safety, data):
"""Update individual interlock indicator"""
if name not in self.INDICATORS or safety == None or data == None:
"""
Update individual interlock indicator

Args:
name: interlock to update
safety: Safety status bit
data: Data status bit
"""
if name not in self.INDICATORS or safety is None or data is None:
self.log("Invalid inputs to update_interlock", LogLevel.ERROR)

color = 'green' if (safety & data) == 1 else 'red'
Expand All @@ -162,11 +194,14 @@ def update_interlock(self, name, safety, data):
current_color = canvas.itemcget(oval_id, 'fill')
if current_color != color:
canvas.itemconfig(oval_id, fill=color)
self.log(f"Interlock {name}: {current_color} -> {color}", LogLevel.INFO)
self.log(
f"Interlock {name}: {current_color} -> {color}",
LogLevel.INFO
)

def _set_all_indicators(self, color):
"""Set all indicators to specified color"""
if color == None or color == "":
if color is None or color == "":
self.log("Invalid inputs to _set_all_indicators", LogLevel.ERROR)

if self.INDICATORS:
Expand All @@ -183,11 +218,10 @@ def update_data(self):

Finally: Will always schedule the next time to refresh data

Catch:
ConnectionError: Thrown from G9Driver when serial connection throws error
ValueError: Thrown from G9Driver when unexpected responce is recieved

Exception: If anything else in message process throws an error
Catches:
ConnectionError: From G9Driver when serial connection fails
ValueError: From G9Driver when unexpected response is received
Exception: For any other unexpected errors

"""
current_time = time.time()
Expand All @@ -197,7 +231,6 @@ def update_data(self):
self._set_all_indicators('red')
self.log("G9 driver not connected", LogLevel.WARNING)
self.last_error_time = current_time
self.last_error_time = time.time()
self._adjust_update_interval(success=False)
else:
# Get interlock status from driver
Expand All @@ -216,15 +249,18 @@ def update_data(self):

# Process dual-input interlocks (first 3 pairs)
for i in range(3):
safety = (sitsf_bits[i*2] &
sitsf_bits[i*2+1])
data = (sitdf_bits[i*2] &
sitdf_bits[i*2+1])

safety = (
sitsf_bits[i*2] &
sitsf_bits[i*2+1]
)
data = (
sitdf_bits[i*2] &
sitdf_bits[i*2+1]
)
self.update_interlock(self.INPUTS[i*2], safety, data)

# Process single-input interlocks
for i in range(6, 13):
for i in range(6, 12):
safety = sitsf_bits[i]
data = sitdf_bits[i]
self.update_interlock(self.INPUTS[i], safety, data)
Expand All @@ -234,16 +270,18 @@ def update_data(self):
self.update_interlock("All Interlocks", True, all_good)

# make sure that the data output indicates button and been pressed and the input is not off/error
if g9_active == sitsf_bits[12]:
if g9_active == sitsf_bits[12] == 1:
self.update_interlock("G9SP Active", True, all_good)
else:
self.update_interlock("G9SP Active", False, all_good)

self._adjust_update_interval(success=True)

except Exception as e:
if time.time() - self.last_error_time > (self.update_interval / 1000):
if current_time - self.last_error_time > (self.update_interval / 1000):
self.log(f"Unexpected error: {str(e)}", LogLevel.ERROR)
self._set_all_indicators('red')
self.last_error_time = time.time()
self.last_error_time = current_time
self._adjust_update_interval(success=False)

finally:
Expand Down
Empty file removed subsystem/interlocks_tester.ipynb
Empty file.