diff --git a/python/tele-op/__init__.py b/python/hlc/__init__.py similarity index 100% rename from python/tele-op/__init__.py rename to python/hlc/__init__.py diff --git a/python/hlc/main.py b/python/hlc/main.py new file mode 100644 index 0000000..e69de29 diff --git a/python/serial/__init__.py b/python/serial/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/python/tele-op/run_tele_op.py b/python/serial/run_tele_op.py similarity index 62% rename from python/tele-op/run_tele_op.py rename to python/serial/run_tele_op.py index 82de98f..f8a37f0 100644 --- a/python/tele-op/run_tele_op.py +++ b/python/serial/run_tele_op.py @@ -11,33 +11,6 @@ from pid.position_feedback import Controller -def get_motor_angles_from_aruco() -> tuple[float, float, float] | tuple[None, None, None]: - """Helper function to encapsulate the aruco/kinematics pipeline - https://github.com/thomasonzhou/MTE380_Software/blob/03967e148c3493c3d2852fd17dae668f0a841446/python/run_homing.py - - Returns: - tuple of 3 floats OR tuple of 3 Nones: Motor angles (MotorA, MotorB, MotorC) or failure - """ - detect_aruco = Popen("./aruco_detection") # THIS IS THOMASON'S BINARY - time.sleep(2) - calibrated = False - motor_angles = (0, 0, 0) - try: - while not calibrated: - data = read_aruco_data() - print(data) - if data is None: - continue - x, y, z, distance = data - motor_angles = translate_N_to_motor_angles(np.array([float(x), float(y), float(z)]), float(distance)) - return motor_angles - - except Exception as e: - print(e) - detect_aruco.kill() - return (None, None, None) - - if __name__ == "__main__": # Setup desired_coord = (0,0) diff --git a/python/serial/serial_utils.py b/python/serial/serial_utils.py new file mode 100644 index 0000000..01e4be7 --- /dev/null +++ b/python/serial/serial_utils.py @@ -0,0 +1,79 @@ +import serial +import platform +import time + +tare_timeout = 1 + +class MotorSerial: + + def __init__(self, baudrate: int): + """Initialize the required serial ports + + Args: + baudrate (int): Serial baudrate + """ + self.baudrate = baurdate + computeOS = platform.system() + if computeOS == 'Linux': + output_com_port = '/dev/ttyUSB0' + elif computeOS == 'Windows': + output_com_port = 'COM5' + + try: + motor_serial = serial.Serial(output_com_port, baudrate, timeout=1) + except serial.SerialException as e: + print(f"Error: {e}") + raise Exception("Failed to initalize serial!") + else: + print(f"Serial Initialized") + self.motor_serial = motor_serial + + def send_encoded_motor_commands(motor_command: str, motor_angles: tuple[float, float, float]): + """ASCII encodes a given motor command and set of motor angles in the expected format + + Args: + motor_command (character): Defines the mode of operation + motor_angles (tuple of 3 floats, (motorA, motorB, motorC)): Angles of the motors to be operated on (rad) + """ + motorA, motorB, motorC = motor_angles + motor_output = f"<{motor_command}, {motorA:.2f}, {motorB:.2f}, {motorC:.2f}>".encode('ascii') + + self.motor_serial.write(motor_output) + + def tare_motors(zero_angles: tuple[float, float, float]) -> bool: + """Helper function to send the tare command and waits for confirmation + + Args: + zero_angles (tuple of 3 floats, (motorA, motorB, motorC)): Angles of the motors' current position (rad) + Returns: + State of motor tare operation: True if Tare successful and false if unsuccessful + """ + # Tare motors given a tuple of angles + self.send_encoded_motor_commands(self.motor_serial, 't', (zero_angles)) + print("Sent 'TARE' Command") + + # Wait for Tare confirmation to be received + timeout_start = time.time() + received_message = "" + while time.time() < timeout_start + tare_timeout: + if self.motor_serial.in_waiting > 0: + serial_data = self.motor_serial.read(self.motor_serial.in_waiting) + try: + # Decode the data as ASCII and strip extraneous characters + received_message += serial_data.decode('ascii').strip() + # Check if "TARE" is received + if "TARE" in received_message: + print("'TARE' Confirmed") + return True + except UnicodeDecodeError: + print("Received Non-ASCII Data") + time.sleep(0.01) + + print("Timeout Error: 'TARE' Not Confirmed") + return False + + def close_serial(motor_serial): + """Helper function to close the serial communications""" + motor_serial.close() + print("Ports Closed") + \ No newline at end of file diff --git a/python/tele-op/parse_process.py b/python/tele-op/parse_process.py deleted file mode 100644 index ccff5c0..0000000 --- a/python/tele-op/parse_process.py +++ /dev/null @@ -1,19 +0,0 @@ -import re - -# Expected ASCII data in the format -format = re.compile(r"<(-?\d+\.\d+),\s*(-?\d+\.\d+)>") - -def parse_coord(data): - match = format.match(data) - if match: - x = float(match.group(1)) - y = float(match.group(2)) - return x, y - else: - # print("Data format error") - return None - -def ascii_encode(motor_angles): - a, b, c = motor_angles - return f"".encode('ascii') - diff --git a/python/tele-op/serial_forward.py b/python/tele-op/serial_forward.py deleted file mode 100644 index fc44c8b..0000000 --- a/python/tele-op/serial_forward.py +++ /dev/null @@ -1,103 +0,0 @@ -import serial -import time -import platform - -from parse_process import parse_coord, ascii_encode -from pid.position_feedback import Controller -from kinematics.wrappers import translate_dir_to_motor_angles - -pid = Controller() -desired_coord = (0,0) -dir_x, dir_y, theta_mag = 0,0,0 - -computeOS = platform.system() -if computeOS == 'Linux': - input_com_port = '/dev/ttyACM0' - output_com_port = '/dev/ttyUSB0' -elif computeOS == 'Windows': - input_com_port = 'COM8' - output_com_port = 'COM7' - -baudrate = 115200 - -try: - ser_in = serial.Serial(input_com_port, baudrate, timeout=1) - ser_out = serial.Serial(output_com_port, baudrate, timeout=1) -except serial.SerialException as e: - print(f"Error: {e}") - exit(1) - -while True: - if ser_in.in_waiting > 0: - input_data = ser_in.read(ser_in.in_waiting) - try: - # Decode the data as ASCII and strip extraneous characters - decoded_data = input_data.decode('ascii').strip() - - # Check if the message is "" - if decoded_data == "": - print("Received 'TARE' Command") - ser_out.write((decoded_data + '\r\n').encode('ascii')) - break - - except UnicodeDecodeError: - print("non-ASCII data") - - time.sleep(0.01) - -received_message = "" -while True: - if ser_out.in_waiting > 0: - output_data = ser_out.read(ser_out.in_waiting) - try: - # Decode the data as ASCII and strip extraneous characters - received_message += output_data.decode('ascii').strip() - - # Check if "TARE" is received - if "TARE" in received_message: - print("Received 'TARE' Confirmation") - break - except UnicodeDecodeError: - print("non-ASCII data") - - time.sleep(0.01) - -try: - while True: - if ser_in.in_waiting > 0: - data = ser_in.read(ser_in.in_waiting) - try: - decoded_data = data.decode('ascii').strip() - actual_coord = parse_coord(decoded_data) - if actual_coord: - actual_x, actual_y = actual_coord - print(f"RX COORD: {decoded_data}") - - # Calulate control signal - dir_x, dir_y, theta_mag = pid.calculate(desired_coord, actual_coord) - # else: - # dir_x, dir_y, theta_mag = 0,0,0 - - # Convert unit direction vector and control signal magnitude to motor angles - motor_angles = translate_dir_to_motor_angles(dir_x, dir_y, theta_mag) - - # Re-encode and send the ASCII-decoded data over the output port - motor_output = ascii_encode(motor_angles) - ser_out.write(motor_output) - - # Debug: Confirm data was sent - print(f"PID VECTOR: {(dir_x, dir_y, theta_mag)}") - print(f"TX ANGLE: {motor_output.decode('ascii')}") - - except UnicodeDecodeError: - print("non-ASCII data") - - time.sleep(0.01) - -except KeyboardInterrupt: - print("Keyboard Interrupt") - -finally: - ser_in.close() - ser_out.close() - print("Ports Closed") diff --git a/python/tele-op/serial_utils.py b/python/tele-op/serial_utils.py deleted file mode 100644 index c0bb9cb..0000000 --- a/python/tele-op/serial_utils.py +++ /dev/null @@ -1,124 +0,0 @@ -import serial -import platform -import time -import re -from pid.position_feedback import Controller -from kinematics.wrappers import translate_dir_to_motor_angles -import serial.serialposix - -tare_timeout = 1 -user_timeout = 30 - -def init_serial(baudrate: int): - """Helper function to initialize the required serial ports for tele-op - - Args: - baudrate (int): Serial baudrate - Returns: - motor_serial (Serial): Object representing the serial communication to the motor ATMEGA - joystick_serial (Serial): Object representing the serial communication to the joystick arduino - """ - computeOS = platform.system() - if computeOS == 'Linux': - input_com_port = '/dev/ttyACM0' - output_com_port = '/dev/ttyUSB0' - elif computeOS == 'Windows': - input_com_port = 'COM8' - output_com_port = 'COM5' - - try: - motor_serial = serial.Serial(input_com_port, baudrate, timeout=1) - joystick_serial = serial.Serial(output_com_port, baudrate, timeout=1) - except serial.SerialException as e: - print(f"Error: {e}") - return None - else: - print(f"Serial Initialized") - return motor_serial, joystick_serial - -def send_encoded_motor_commands(motor_serial: serial.Serial, motor_command: str, motor_angles: tuple[float, float, float]): - """Helper function that ASCII encodes a given motor command and set of motor angles in the expected format - - Args: - motor_serial (Serial): Object representing the serial communication to the motor - motor_command (character): Defines the mode of operation - motor_angles (tuple of 3 floats, (motorA, motorB, motorC)): Angles of the motors to be operated on (rad) - """ - motorA, motorB, motorC = motor_angles - motor_output = f"<{motor_command}, {motorA:.2f}, {motorB:.2f}, {motorC:.2f}>".encode('ascii') - - motor_serial.write(motor_output) - -def tare_motors(motor_serial: serial.Serial, zero_angles: tuple[float, float, float]): - """Helper function to send the tare command and waits for confirmation - - Args: - motor_serial (Serial): Object representing the serial communication to the motor - zero_angles (tuple of 3 floats, (motorA, motorB, motorC)): Angles of the motors' current position (rad) - Returns: - State of motor tare operation: True if Tare successful and false if unsuccessful - """ - # Tare motors given a tuple of angles - send_encoded_motor_commands(motor_serial, 't', (zero_angles)) - print("Sent 'TARE' Command") - - # Wait for Tare confirmation to be received - timeout_start = time.time() - received_message = "" - while time.time() < timeout_start + tare_timeout: - if motor_serial.in_waiting > 0: - serial_data = motor_serial.read(motor_serial.in_waiting) - try: - # Decode the data as ASCII and strip extraneous characters - received_message += serial_data.decode('ascii').strip() - # Check if "TARE" is received - if "TARE" in received_message: - print("'TARE' Confirmed") - return True - except UnicodeDecodeError: - print("Received Non-ASCII Data") - time.sleep(0.01) - - print("Timeout Error: 'TARE' Not Confirmed") - return False - -def wait_for_user(joystick_serial: serial.Serial): - """Blocking funciton to wait for user start""" - timeout_start = time.time() - while time.time() < timeout_start + user_timeout: - if joystick_serial.in_waiting > 0: - break - time.sleep(0.01) - print("Timeout Error: No User Input") - -def parse_coord(input_data): - """Helper function to extract coordinate values from Arduino serial""" - # Expected ASCII data in the format - format = re.compile(r"<(-?\d+\.\d+),\s*(-?\d+\.\d+)>") - match = format.match(input_data) - if match: - x = float(match.group(1)) - y = float(match.group(2)) - return x, y - else: - print("Data format error") - return None - -def joystick_decode(joystick_serial: serial.Serial): - if joystick_serial.in_waiting > 0: - data = joystick_serial.read(joystick_serial.in_waiting) - try: - decoded_data = data.decode('ascii').strip() - actual_coord = parse_coord(decoded_data) - if actual_coord: - print(f"RX COORD: {decoded_data}") - return actual_coord - except UnicodeDecodeError: - print("non-ASCII data") - -def close_serial(motor_serial, joystick_serial): - """Helper function to close the serial communications""" - motor_serial.close() - joystick_serial.close() - print("Ports Closed") - \ No newline at end of file