Skip to content

Commit

Permalink
initial comit (#488)
Browse files Browse the repository at this point in the history
  • Loading branch information
danjampro authored Jun 22, 2021
1 parent 72b7e02 commit 93732d1
Show file tree
Hide file tree
Showing 3 changed files with 245 additions and 252 deletions.
156 changes: 33 additions & 123 deletions src/huntsman/pocs/focuser/astromechanics.py
Original file line number Diff line number Diff line change
@@ -1,143 +1,53 @@
import os
import numpy as np

from panoptes.utils import error
from panoptes.utils.time import current_time

from panoptes.pocs.focuser.astromechanics import Focuser as AstromechFocuser
from panoptes.pocs.utils.plotting import make_autofocus_plot

from huntsman.pocs.utils.focus import AutofocusSequence
from huntsman.pocs.focuser.serial import HuntsmanSerialFocuser


class Focuser(AstromechFocuser):
class Focuser(AstromechFocuser, HuntsmanSerialFocuser):
def __init__(self, *args, **kwargs):
"""Initialize an AbstractSerialMount for the port defined in the config.
Opens a connection to the serial device, if it is valid.
"""
self._position = None
super().__init__(*args, **kwargs)

initial_position = kwargs.get("initial_position", None)
self.logger.debug(f"Initial position for {self}: {initial_position}")
if initial_position is not None:
self.position = initial_position
@HuntsmanSerialFocuser.position.getter
def position(self):
return int(self._position)

def _autofocus(self, *args, **kwargs):
focus_event = kwargs.pop("focus_event")
def move_to(self, new_position):
""" Override to use panoptes utils serial code. """
self._is_moving = True
try:
return self._run_autofocus(*args, **kwargs)
self._send_command(f'M{int(new_position):d}#')
self._position = new_position
finally:
if focus_event is not None:
focus_event.set()

def _run_autofocus(self, seconds, focus_range, focus_step, cutout_size, keep_files=False,
take_dark=True, coarse=False, make_plots=False, max_exposure_retries=3,
**kwargs):
"""
Focuses the camera using the specified merit function. Optionally performs
a coarse focus to find the approximate position of infinity focus, which
should be followed by a fine focus before observing.
Args:
seconds (scalar, optional): Exposure time for focus exposures, if not
specified will use value from config.
focus_range (2-tuple, optional): Coarse & fine focus sweep range, in
encoder units. Specify to override values from config.
focus_step (2-tuple, optional): Coarse & fine focus sweep steps, in
encoder units. Specify to override values from config.
cutout_size (int, optional): Size of square central region of image
to use, default 500 x 500 pixels.
keep_files (bool, optional): If True will keep all images taken
during focusing. If False (default) will delete all except the
first and last images from each focus run.
take_dark (bool, optional): If True will attempt to take a dark frame
before the focus run, and use it for dark subtraction and hot
pixel masking, default True.
coarse (bool, optional): Whether to perform a coarse focus, otherwise will perform
a fine focus. Default False.
make_plots (bool, optional): Whether to write focus plots to images folder. If not
given will fall back on value of `autofocus_make_plots` set on initialisation,
and if it wasn't set then will default to False.
blocking (bool, optional): Whether to block until autofocus complete, default False.
"""
start_time = start_time = current_time(flatten=True)
imagedir = os.path.join(self.camera.get_config('directories.images'), 'focus',
self.camera.uid, start_time)
initial_position = self.position

# Get focus range
idx = 1 if coarse else 0
position_step = focus_step[idx]
position_min = initial_position - focus_range[idx] / 2
position_max = initial_position + focus_range[idx] / 2

# Make sequence object
sequence = AutofocusSequence(position_min=position_min, position_max=position_max,
position_step=position_step, bit_depth=self.camera.bit_depth,
**kwargs)
# Add a dark exposure
if take_dark:
self.logger.info(f"Taking dark frame before autofocus on {self}.")
filename = os.path.join(imagedir, f"dark.{self.camera.file_extension}")
cutout = self.camera.get_cutout(seconds, filename, cutout_size, keep_file=keep_files,
dark=True)
sequence.dark_image = cutout

# Take the focusing exposures
exposure_retries = 0
while not sequence.is_finished:
self.logger.info(f"Autofocus status on {self}: {sequence.status}")

new_position = sequence.get_next_position()

basename = f"{new_position}-{sequence.exposure_idx:02d}.{self.camera.file_extension}"
filename = os.path.join(imagedir, basename)

# Move the focuser
self.move_to(new_position)

# Get the exposure cutout
try:
cutout = self.camera.get_cutout(seconds, filename, cutout_size,
keep_file=keep_files)
exposure_retries = 0 # Reset exposure retries
except error.PanError as err:
self.logger.warning(f"Exception encountered in get_cutout on {self}: {err!r}")

# Abort the sequence if max exposure retries is reached
exposure_retries += 1
if exposure_retries >= max_exposure_retries:
raise error.PanError(f"Max exposure retries reached during autofocus on"
f" {self}.")
self.logger.warning("Continuing with autofocus sequence after exposure error on"
f" {self}.")
continue

# Update the sequence
sequence.update(cutout, position=self.position)
# Focuser move commands block until the move is finished, so if the command has
# returned then the focuser is no longer moving.
self._is_moving = False

# Get the best position and move to it
best_position = sequence.best_position
best_position_actual = self.move_to(best_position)
self.logger.info(f"Best focus position for {self}: {best_position}")
self.logger.debug(f"Moved to encoder position {self.position}")
return self.position

if make_plots:
focus_type = "coarse" if coarse else "fine"
plot_filename = os.path.join(imagedir, f'{focus_type}-focus-{self.camera.uid}.png')
plot_title = f'{self} {focus_type} focus at {start_time}'
def move_by(self, *args, **kwargs):
""" Override to set position. """
self._position = super().move_by(*args, **kwargs)

metrics = sequence.metrics
focus_positions = sequence.positions
merit_function = sequence.merit_function_name
def _send_command(self, command):
""" Override method to use panoptes-utils code. """
if not self.is_connected:
self.logger.critical(f"Attempt to send command to {self} when not connected!")
return

initial_idx = np.argmin(abs(focus_positions - initial_position))
initial_cutout = sequence.images[initial_idx]
# Clear the input buffer in case there's anything left over in there.
self._serial.reset_input_buffer()

final_idx = np.argmin(abs(focus_positions - best_position))
final_cutout = sequence.images[final_idx]
# Send command
self._serial.write(command + '\r')

self.logger.info(f"Writing focus plot for {self} to {plot_filename}.")
make_autofocus_plot(plot_filename, initial_cutout, final_cutout, initial_position,
best_position_actual, focus_positions, metrics, merit_function,
plot_title=plot_title)
return self._serial.read()

return initial_position, best_position
def _move_zero(self):
""" Override to set position. """
super()._move_zero()
self._position = 0
179 changes: 50 additions & 129 deletions src/huntsman/pocs/focuser/birger.py
Original file line number Diff line number Diff line change
@@ -1,17 +1,10 @@
""" Modified focuser to reconnect serial port on command error. """
import os
import numpy as np

from panoptes.utils import error
from panoptes.utils.time import current_time

from panoptes.pocs.focuser.birger import Focuser as BirgerFocuser
from panoptes.pocs.utils.plotting import make_autofocus_plot

from huntsman.pocs.utils.focus import AutofocusSequence
from huntsman.pocs.focuser.serial import HuntsmanSerialFocuser
from panoptes.pocs.focuser.birger import Focuser as BirgerFocuser, error_pattern, error_messages


class Focuser(BirgerFocuser):
class Focuser(BirgerFocuser, HuntsmanSerialFocuser):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)

Expand All @@ -21,132 +14,60 @@ def reconnect(self):
self.__del__()
self.connect(port=self.port)

def _send_command(self, *args, **kwargs):
""" Try command, attempt to reconnect on error and send command again. """
try:
return super()._send_command(*args, **kwargs)
except error.PanError as err:
self.logger.warning(f"Focuser command failed with exception: {err!r}. Retrying after"
" reconnect.")
self.reconnect()
return super()._send_command(*args, **kwargs)

def _autofocus(self, *args, **kwargs):
focus_event = kwargs.pop("focus_event")
try:
return self._run_autofocus(*args, **kwargs)
finally:
if focus_event is not None:
focus_event.set()

def _run_autofocus(self, seconds, focus_range, focus_step, cutout_size, keep_files=False,
take_dark=True, coarse=False, make_plots=False, max_exposure_retries=3,
**kwargs):
def _send_command(self, command, *args, **kwargs):
"""
Focuses the camera using the specified merit function. Optionally performs
a coarse focus to find the approximate position of infinity focus, which
should be followed by a fine focus before observing.
Sends a command to the focuser adaptor and retrieves the response.
Args:
seconds (scalar, optional): Exposure time for focus exposures, if not
specified will use value from config.
focus_range (2-tuple, optional): Coarse & fine focus sweep range, in
encoder units. Specify to override values from config.
focus_step (2-tuple, optional): Coarse & fine focus sweep steps, in
encoder units. Specify to override values from config.
cutout_size (int, optional): Size of square central region of image
to use, default 500 x 500 pixels.
keep_files (bool, optional): If True will keep all images taken
during focusing. If False (default) will delete all except the
first and last images from each focus run.
take_dark (bool, optional): If True will attempt to take a dark frame
before the focus run, and use it for dark subtraction and hot
pixel masking, default True.
coarse (bool, optional): Whether to perform a coarse focus, otherwise will perform
a fine focus. Default False.
make_plots (bool, optional): Whether to write focus plots to images folder. If not
given will fall back on value of `autofocus_make_plots` set on initialisation,
and if it wasn't set then will default to False.
blocking (bool, optional): Whether to block until autofocus complete, default False.
command (string): command string to send (without newline), e.g. 'fa1000', 'pf'
Returns:
list: possibly empty list containing the '\r' terminated lines of the response from the
adaptor.
"""
start_time = start_time = current_time(flatten=True)
imagedir = os.path.join(self.camera.get_config('directories.images'), 'focus',
self.camera.uid, start_time)
initial_position = self.position

# Get focus range
idx = 1 if coarse else 0
position_step = focus_step[idx]
position_min = max(self.min_position, initial_position - focus_range[idx] / 2)
position_max = min(self.max_position, initial_position + focus_range[idx] / 2)

# Make sequence object
sequence = AutofocusSequence(position_min=position_min, position_max=position_max,
position_step=position_step, bit_depth=self.camera.bit_depth,
**kwargs)
# Add a dark exposure
if take_dark:
self.logger.info(f"Taking dark frame before autofocus on {self}.")
filename = os.path.join(imagedir, f"dark.{self.camera.file_extension}")
cutout = self.camera.get_cutout(seconds, filename, cutout_size, keep_file=keep_files,
dark=True)
sequence.dark_image = cutout

# Take the focusing exposures
exposure_retries = 0
while not sequence.is_finished:
self.logger.info(f"Autofocus status on {self}: {sequence.status}")

new_position = sequence.get_next_position()
if not self.is_connected:
self.logger.critical("Attempt to send command to {} when not connected!".format(self))
return

basename = f"{new_position}-{sequence.exposure_idx:02d}.{self.camera.file_extension}"
filename = os.path.join(imagedir, basename)
# Success variable to verify that the command sent is read by the focuser.
success = False

# Move the focuser
self.move_to(new_position)
for i in range(self._max_command_retries):
# Clear the input buffer in case there's anything left over in there.
self._serial.reset_input_buffer()

# Get the exposure cutout
try:
cutout = self.camera.get_cutout(seconds, filename, cutout_size,
keep_file=keep_files)
exposure_retries = 0 # Reset exposure retries
except error.PanError as err:
self.logger.warning(f"Exception encountered in get_cutout on {self}: {err!r}")
# Send the command
self._serial.write(command + '\r')
raw_response = self._serial.read().rstrip().split("\r")

# Abort the sequence if max exposure retries is reached
exposure_retries += 1
if exposure_retries >= max_exposure_retries:
raise error.PanError(f"Max exposure retries reached during autofocus on"
f" {self}.")
self.logger.warning("Continuing with autofocus sequence after exposure error on"
f" {self}.")
# In verbose mode adaptor will first echo the command
echo = raw_response[0]
if echo != command:
self.logger.warning(f'echo != command: {echo!r} != {command!r}. Retrying command.')
continue

# Update the sequence
sequence.update(cutout, position=self.position)

# Get the best position and move to it
best_position = sequence.best_position
best_position_actual = self.move_to(best_position)
self.logger.info(f"Best focus position for {self}: {best_position}")

if make_plots:
focus_type = "coarse" if coarse else "fine"
plot_filename = os.path.join(imagedir, f'{focus_type}-focus-{self.camera.uid}.png')
plot_title = f'{self} {focus_type} focus at {start_time}'

metrics = sequence.metrics
focus_positions = sequence.positions
merit_function = sequence.merit_function_name

initial_idx = np.argmin(abs(focus_positions - initial_position))
initial_cutout = sequence.images[initial_idx]

final_idx = np.argmin(abs(focus_positions - best_position))
final_cutout = sequence.images[final_idx]

self.logger.info(f"Writing focus plot for {self} to {plot_filename}.")
make_autofocus_plot(plot_filename, initial_cutout, final_cutout, initial_position,
best_position_actual, focus_positions, metrics, merit_function,
plot_title=plot_title)
# Adaptor should then send 'OK', even if there was an error.
ok = raw_response[1]
if ok != 'OK':
self.logger.warning(f"ok != 'OK': {ok!r} != 'OK'. Retrying command.")
continue

return initial_position, best_position
# Depending on which command was sent there may or may not be any further response.
response = raw_response[2:]
success = True
break

if not success:
raise error.PanError(f'Failed command {command!r} on {self}')

# Check for an error message in response
if response:
# Not an empty list.
error_match = error_pattern.match(response[0])
if error_match:
# Got an error message! Translate it.
try:
error_message = error_messages[int(error_match.group())]
self.logger.error(f"{self} returned error message '{error_message}'!")
except Exception:
self.logger.error(f"Unknown error '{error_match.group()}' from {self}!")

return response
Loading

0 comments on commit 93732d1

Please sign in to comment.