From 399a93361c86082d1d22b069c09456dd07009801 Mon Sep 17 00:00:00 2001 From: Dan Date: Tue, 22 Jun 2021 13:46:10 +1000 Subject: [PATCH] initial comit --- src/huntsman/pocs/focuser/astromechanics.py | 156 ++++------------- src/huntsman/pocs/focuser/birger.py | 179 ++++++-------------- src/huntsman/pocs/focuser/serial.py | 162 ++++++++++++++++++ 3 files changed, 245 insertions(+), 252 deletions(-) create mode 100644 src/huntsman/pocs/focuser/serial.py diff --git a/src/huntsman/pocs/focuser/astromechanics.py b/src/huntsman/pocs/focuser/astromechanics.py index d636d455..5e2b7bad 100644 --- a/src/huntsman/pocs/focuser/astromechanics.py +++ b/src/huntsman/pocs/focuser/astromechanics.py @@ -1,143 +1,53 @@ -import os -import numpy as np - -from panoptes.utils import error -from panoptes.utils.time import current_time - from panoptes.pocs.focuser.astromechanics import Focuser as AstromechFocuser -from panoptes.pocs.utils.plotting import make_autofocus_plot -from huntsman.pocs.utils.focus import AutofocusSequence +from huntsman.pocs.focuser.serial import HuntsmanSerialFocuser -class Focuser(AstromechFocuser): +class Focuser(AstromechFocuser, HuntsmanSerialFocuser): def __init__(self, *args, **kwargs): """Initialize an AbstractSerialMount for the port defined in the config. Opens a connection to the serial device, if it is valid. """ + self._position = None super().__init__(*args, **kwargs) - initial_position = kwargs.get("initial_position", None) - self.logger.debug(f"Initial position for {self}: {initial_position}") - if initial_position is not None: - self.position = initial_position + @HuntsmanSerialFocuser.position.getter + def position(self): + return int(self._position) - def _autofocus(self, *args, **kwargs): - focus_event = kwargs.pop("focus_event") + def move_to(self, new_position): + """ Override to use panoptes utils serial code. """ + self._is_moving = True try: - return self._run_autofocus(*args, **kwargs) + self._send_command(f'M{int(new_position):d}#') + self._position = new_position finally: - if focus_event is not None: - focus_event.set() - - def _run_autofocus(self, seconds, focus_range, focus_step, cutout_size, keep_files=False, - take_dark=True, coarse=False, make_plots=False, max_exposure_retries=3, - **kwargs): - """ - Focuses the camera using the specified merit function. Optionally performs - a coarse focus to find the approximate position of infinity focus, which - should be followed by a fine focus before observing. - Args: - seconds (scalar, optional): Exposure time for focus exposures, if not - specified will use value from config. - focus_range (2-tuple, optional): Coarse & fine focus sweep range, in - encoder units. Specify to override values from config. - focus_step (2-tuple, optional): Coarse & fine focus sweep steps, in - encoder units. Specify to override values from config. - cutout_size (int, optional): Size of square central region of image - to use, default 500 x 500 pixels. - keep_files (bool, optional): If True will keep all images taken - during focusing. If False (default) will delete all except the - first and last images from each focus run. - take_dark (bool, optional): If True will attempt to take a dark frame - before the focus run, and use it for dark subtraction and hot - pixel masking, default True. - coarse (bool, optional): Whether to perform a coarse focus, otherwise will perform - a fine focus. Default False. - make_plots (bool, optional): Whether to write focus plots to images folder. If not - given will fall back on value of `autofocus_make_plots` set on initialisation, - and if it wasn't set then will default to False. - blocking (bool, optional): Whether to block until autofocus complete, default False. - """ - start_time = start_time = current_time(flatten=True) - imagedir = os.path.join(self.camera.get_config('directories.images'), 'focus', - self.camera.uid, start_time) - initial_position = self.position - - # Get focus range - idx = 1 if coarse else 0 - position_step = focus_step[idx] - position_min = initial_position - focus_range[idx] / 2 - position_max = initial_position + focus_range[idx] / 2 - - # Make sequence object - sequence = AutofocusSequence(position_min=position_min, position_max=position_max, - position_step=position_step, bit_depth=self.camera.bit_depth, - **kwargs) - # Add a dark exposure - if take_dark: - self.logger.info(f"Taking dark frame before autofocus on {self}.") - filename = os.path.join(imagedir, f"dark.{self.camera.file_extension}") - cutout = self.camera.get_cutout(seconds, filename, cutout_size, keep_file=keep_files, - dark=True) - sequence.dark_image = cutout - - # Take the focusing exposures - exposure_retries = 0 - while not sequence.is_finished: - self.logger.info(f"Autofocus status on {self}: {sequence.status}") - - new_position = sequence.get_next_position() - - basename = f"{new_position}-{sequence.exposure_idx:02d}.{self.camera.file_extension}" - filename = os.path.join(imagedir, basename) - - # Move the focuser - self.move_to(new_position) - - # Get the exposure cutout - try: - cutout = self.camera.get_cutout(seconds, filename, cutout_size, - keep_file=keep_files) - exposure_retries = 0 # Reset exposure retries - except error.PanError as err: - self.logger.warning(f"Exception encountered in get_cutout on {self}: {err!r}") - - # Abort the sequence if max exposure retries is reached - exposure_retries += 1 - if exposure_retries >= max_exposure_retries: - raise error.PanError(f"Max exposure retries reached during autofocus on" - f" {self}.") - self.logger.warning("Continuing with autofocus sequence after exposure error on" - f" {self}.") - continue - - # Update the sequence - sequence.update(cutout, position=self.position) + # Focuser move commands block until the move is finished, so if the command has + # returned then the focuser is no longer moving. + self._is_moving = False - # Get the best position and move to it - best_position = sequence.best_position - best_position_actual = self.move_to(best_position) - self.logger.info(f"Best focus position for {self}: {best_position}") + self.logger.debug(f"Moved to encoder position {self.position}") + return self.position - if make_plots: - focus_type = "coarse" if coarse else "fine" - plot_filename = os.path.join(imagedir, f'{focus_type}-focus-{self.camera.uid}.png') - plot_title = f'{self} {focus_type} focus at {start_time}' + def move_by(self, *args, **kwargs): + """ Override to set position. """ + self._position = super().move_by(*args, **kwargs) - metrics = sequence.metrics - focus_positions = sequence.positions - merit_function = sequence.merit_function_name + def _send_command(self, command): + """ Override method to use panoptes-utils code. """ + if not self.is_connected: + self.logger.critical(f"Attempt to send command to {self} when not connected!") + return - initial_idx = np.argmin(abs(focus_positions - initial_position)) - initial_cutout = sequence.images[initial_idx] + # Clear the input buffer in case there's anything left over in there. + self._serial.reset_input_buffer() - final_idx = np.argmin(abs(focus_positions - best_position)) - final_cutout = sequence.images[final_idx] + # Send command + self._serial.write(command + '\r') - self.logger.info(f"Writing focus plot for {self} to {plot_filename}.") - make_autofocus_plot(plot_filename, initial_cutout, final_cutout, initial_position, - best_position_actual, focus_positions, metrics, merit_function, - plot_title=plot_title) + return self._serial.read() - return initial_position, best_position + def _move_zero(self): + """ Override to set position. """ + super()._move_zero() + self._position = 0 diff --git a/src/huntsman/pocs/focuser/birger.py b/src/huntsman/pocs/focuser/birger.py index 89b80934..5c2710b8 100644 --- a/src/huntsman/pocs/focuser/birger.py +++ b/src/huntsman/pocs/focuser/birger.py @@ -1,17 +1,10 @@ -""" Modified focuser to reconnect serial port on command error. """ -import os -import numpy as np - from panoptes.utils import error -from panoptes.utils.time import current_time - -from panoptes.pocs.focuser.birger import Focuser as BirgerFocuser -from panoptes.pocs.utils.plotting import make_autofocus_plot -from huntsman.pocs.utils.focus import AutofocusSequence +from huntsman.pocs.focuser.serial import HuntsmanSerialFocuser +from panoptes.pocs.focuser.birger import Focuser as BirgerFocuser, error_pattern, error_messages -class Focuser(BirgerFocuser): +class Focuser(BirgerFocuser, HuntsmanSerialFocuser): def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) @@ -21,132 +14,60 @@ def reconnect(self): self.__del__() self.connect(port=self.port) - def _send_command(self, *args, **kwargs): - """ Try command, attempt to reconnect on error and send command again. """ - try: - return super()._send_command(*args, **kwargs) - except error.PanError as err: - self.logger.warning(f"Focuser command failed with exception: {err!r}. Retrying after" - " reconnect.") - self.reconnect() - return super()._send_command(*args, **kwargs) - - def _autofocus(self, *args, **kwargs): - focus_event = kwargs.pop("focus_event") - try: - return self._run_autofocus(*args, **kwargs) - finally: - if focus_event is not None: - focus_event.set() - - def _run_autofocus(self, seconds, focus_range, focus_step, cutout_size, keep_files=False, - take_dark=True, coarse=False, make_plots=False, max_exposure_retries=3, - **kwargs): + def _send_command(self, command, *args, **kwargs): """ - Focuses the camera using the specified merit function. Optionally performs - a coarse focus to find the approximate position of infinity focus, which - should be followed by a fine focus before observing. + Sends a command to the focuser adaptor and retrieves the response. Args: - seconds (scalar, optional): Exposure time for focus exposures, if not - specified will use value from config. - focus_range (2-tuple, optional): Coarse & fine focus sweep range, in - encoder units. Specify to override values from config. - focus_step (2-tuple, optional): Coarse & fine focus sweep steps, in - encoder units. Specify to override values from config. - cutout_size (int, optional): Size of square central region of image - to use, default 500 x 500 pixels. - keep_files (bool, optional): If True will keep all images taken - during focusing. If False (default) will delete all except the - first and last images from each focus run. - take_dark (bool, optional): If True will attempt to take a dark frame - before the focus run, and use it for dark subtraction and hot - pixel masking, default True. - coarse (bool, optional): Whether to perform a coarse focus, otherwise will perform - a fine focus. Default False. - make_plots (bool, optional): Whether to write focus plots to images folder. If not - given will fall back on value of `autofocus_make_plots` set on initialisation, - and if it wasn't set then will default to False. - blocking (bool, optional): Whether to block until autofocus complete, default False. + command (string): command string to send (without newline), e.g. 'fa1000', 'pf' + Returns: + list: possibly empty list containing the '\r' terminated lines of the response from the + adaptor. """ - start_time = start_time = current_time(flatten=True) - imagedir = os.path.join(self.camera.get_config('directories.images'), 'focus', - self.camera.uid, start_time) - initial_position = self.position - - # Get focus range - idx = 1 if coarse else 0 - position_step = focus_step[idx] - position_min = max(self.min_position, initial_position - focus_range[idx] / 2) - position_max = min(self.max_position, initial_position + focus_range[idx] / 2) - - # Make sequence object - sequence = AutofocusSequence(position_min=position_min, position_max=position_max, - position_step=position_step, bit_depth=self.camera.bit_depth, - **kwargs) - # Add a dark exposure - if take_dark: - self.logger.info(f"Taking dark frame before autofocus on {self}.") - filename = os.path.join(imagedir, f"dark.{self.camera.file_extension}") - cutout = self.camera.get_cutout(seconds, filename, cutout_size, keep_file=keep_files, - dark=True) - sequence.dark_image = cutout - - # Take the focusing exposures - exposure_retries = 0 - while not sequence.is_finished: - self.logger.info(f"Autofocus status on {self}: {sequence.status}") - - new_position = sequence.get_next_position() + if not self.is_connected: + self.logger.critical("Attempt to send command to {} when not connected!".format(self)) + return - basename = f"{new_position}-{sequence.exposure_idx:02d}.{self.camera.file_extension}" - filename = os.path.join(imagedir, basename) + # Success variable to verify that the command sent is read by the focuser. + success = False - # Move the focuser - self.move_to(new_position) + for i in range(self._max_command_retries): + # Clear the input buffer in case there's anything left over in there. + self._serial.reset_input_buffer() - # Get the exposure cutout - try: - cutout = self.camera.get_cutout(seconds, filename, cutout_size, - keep_file=keep_files) - exposure_retries = 0 # Reset exposure retries - except error.PanError as err: - self.logger.warning(f"Exception encountered in get_cutout on {self}: {err!r}") + # Send the command + self._serial.write(command + '\r') + raw_response = self._serial.read().rstrip().split("\r") - # Abort the sequence if max exposure retries is reached - exposure_retries += 1 - if exposure_retries >= max_exposure_retries: - raise error.PanError(f"Max exposure retries reached during autofocus on" - f" {self}.") - self.logger.warning("Continuing with autofocus sequence after exposure error on" - f" {self}.") + # In verbose mode adaptor will first echo the command + echo = raw_response[0] + if echo != command: + self.logger.warning(f'echo != command: {echo!r} != {command!r}. Retrying command.') continue - # Update the sequence - sequence.update(cutout, position=self.position) - - # Get the best position and move to it - best_position = sequence.best_position - best_position_actual = self.move_to(best_position) - self.logger.info(f"Best focus position for {self}: {best_position}") - - if make_plots: - focus_type = "coarse" if coarse else "fine" - plot_filename = os.path.join(imagedir, f'{focus_type}-focus-{self.camera.uid}.png') - plot_title = f'{self} {focus_type} focus at {start_time}' - - metrics = sequence.metrics - focus_positions = sequence.positions - merit_function = sequence.merit_function_name - - initial_idx = np.argmin(abs(focus_positions - initial_position)) - initial_cutout = sequence.images[initial_idx] - - final_idx = np.argmin(abs(focus_positions - best_position)) - final_cutout = sequence.images[final_idx] - - self.logger.info(f"Writing focus plot for {self} to {plot_filename}.") - make_autofocus_plot(plot_filename, initial_cutout, final_cutout, initial_position, - best_position_actual, focus_positions, metrics, merit_function, - plot_title=plot_title) + # Adaptor should then send 'OK', even if there was an error. + ok = raw_response[1] + if ok != 'OK': + self.logger.warning(f"ok != 'OK': {ok!r} != 'OK'. Retrying command.") + continue - return initial_position, best_position + # Depending on which command was sent there may or may not be any further response. + response = raw_response[2:] + success = True + break + + if not success: + raise error.PanError(f'Failed command {command!r} on {self}') + + # Check for an error message in response + if response: + # Not an empty list. + error_match = error_pattern.match(response[0]) + if error_match: + # Got an error message! Translate it. + try: + error_message = error_messages[int(error_match.group())] + self.logger.error(f"{self} returned error message '{error_message}'!") + except Exception: + self.logger.error(f"Unknown error '{error_match.group()}' from {self}!") + + return response diff --git a/src/huntsman/pocs/focuser/serial.py b/src/huntsman/pocs/focuser/serial.py new file mode 100644 index 00000000..76f1c432 --- /dev/null +++ b/src/huntsman/pocs/focuser/serial.py @@ -0,0 +1,162 @@ +""" Base Huntsman override class. """ +import os +import numpy as np + +from panoptes.utils import error +from panoptes.utils.time import current_time +from panoptes.utils.rs232 import SerialData + +from panoptes.pocs.focuser.serial import AbstractSerialFocuser +from panoptes.pocs.utils.plotting import make_autofocus_plot + +from huntsman.pocs.utils.focus import AutofocusSequence + + +class HuntsmanSerialFocuser(AbstractSerialFocuser): + + def __init__(self, *args, **kwargs): + super().__init__(*args, **kwargs) + + # Explicitly set the initial position as currently missing from POCS astromech code + initial_position = kwargs.get("initial_position", None) + self.logger.debug(f"Initial position for {self}: {initial_position}") + if initial_position is not None: + self.position = initial_position + + @property + def is_connected(self): + """ Override to use panoptes utils serial code. """ + connected = False + if self._serial: + connected = self._serial.is_connected + return connected + + def _connect(self, port, baudrate): + """ Override the serial device object using panoptes-utils code. """ + self._serial = SerialData(port=port, baudrate=baudrate) + + def _autofocus(self, *args, **kwargs): + focus_event = kwargs.pop("focus_event") + try: + return self._run_autofocus(*args, **kwargs) + finally: + if focus_event is not None: + focus_event.set() + + def _run_autofocus(self, seconds, focus_range, focus_step, cutout_size, keep_files=False, + take_dark=True, coarse=False, make_plots=False, max_exposure_retries=3, + **kwargs): + """ + Focuses the camera using the specified merit function. Optionally performs + a coarse focus to find the approximate position of infinity focus, which + should be followed by a fine focus before observing. + Args: + seconds (scalar, optional): Exposure time for focus exposures, if not + specified will use value from config. + focus_range (2-tuple, optional): Coarse & fine focus sweep range, in + encoder units. Specify to override values from config. + focus_step (2-tuple, optional): Coarse & fine focus sweep steps, in + encoder units. Specify to override values from config. + cutout_size (int, optional): Size of square central region of image + to use, default 500 x 500 pixels. + keep_files (bool, optional): If True will keep all images taken + during focusing. If False (default) will delete all except the + first and last images from each focus run. + take_dark (bool, optional): If True will attempt to take a dark frame + before the focus run, and use it for dark subtraction and hot + pixel masking, default True. + coarse (bool, optional): Whether to perform a coarse focus, otherwise will perform + a fine focus. Default False. + make_plots (bool, optional): Whether to write focus plots to images folder. If not + given will fall back on value of `autofocus_make_plots` set on initialisation, + and if it wasn't set then will default to False. + blocking (bool, optional): Whether to block until autofocus complete, default False. + """ + start_time = start_time = current_time(flatten=True) + imagedir = os.path.join(self.camera.get_config('directories.images'), 'focus', + self.camera.uid, start_time) + initial_position = self.position + + # Get focus range + idx = 1 if coarse else 0 + position_step = focus_step[idx] + position_min = initial_position - focus_range[idx] / 2 + position_max = initial_position + focus_range[idx] / 2 + + # Apply focuser movement boundaries + if self.max_position is not None: + position_max = min(position_max, self.max_position) + if self.min_position is not None: + position_max = max(position_min, self.min_position) + + # Make sequence object + sequence = AutofocusSequence(position_min=position_min, position_max=position_max, + position_step=position_step, bit_depth=self.camera.bit_depth, + **kwargs) + # Add a dark exposure + if take_dark: + self.logger.info(f"Taking dark frame before autofocus on {self}.") + filename = os.path.join(imagedir, f"dark.{self.camera.file_extension}") + cutout = self.camera.get_cutout(seconds, filename, cutout_size, keep_file=keep_files, + dark=True) + sequence.dark_image = cutout + + # Take the focusing exposures + exposure_retries = 0 + while not sequence.is_finished: + self.logger.info(f"Autofocus status on {self}: {sequence.status}") + + new_position = sequence.get_next_position() + + basename = f"{new_position}-{sequence.exposure_idx:02d}.{self.camera.file_extension}" + filename = os.path.join(imagedir, basename) + + # Move the focuser + self.move_to(new_position) + + # Get the exposure cutout + try: + cutout = self.camera.get_cutout(seconds, filename, cutout_size, + keep_file=keep_files) + exposure_retries = 0 # Reset exposure retries + except error.PanError as err: + self.logger.warning(f"Exception encountered in get_cutout on {self}: {err!r}") + + # Abort the sequence if max exposure retries is reached + exposure_retries += 1 + if exposure_retries >= max_exposure_retries: + raise error.PanError(f"Max exposure retries reached during autofocus on" + f" {self}.") + self.logger.warning("Continuing with autofocus sequence after exposure error on" + f" {self}.") + continue + + # Update the sequence + sequence.update(cutout, position=self.position) + + # Get the best position and move to it + best_position = sequence.best_position + best_position_actual = self.move_to(best_position) + self.logger.info(f"Best focus position for {self}: {best_position}") + + if make_plots: + focus_type = "coarse" if coarse else "fine" + plot_filename = os.path.join(imagedir, f'{focus_type}-focus-{self.camera.uid}.png') + plot_title = f'{self} {focus_type} focus at {start_time}' + + metrics = sequence.metrics + focus_positions = sequence.positions + merit_function = sequence.merit_function_name + + initial_idx = np.argmin(abs(focus_positions - initial_position)) + initial_cutout = sequence.images[initial_idx] + + final_idx = np.argmin(abs(focus_positions - best_position)) + final_cutout = sequence.images[final_idx] + + self.logger.info(f"Writing focus plot for {self} to {plot_filename}.") + make_autofocus_plot(plot_filename, initial_cutout, final_cutout, initial_position, + best_position_actual, focus_positions, metrics, merit_function, + plot_title=plot_title) + + return initial_position, best_position