From 40d49f94dd0599a26510859ad43a4fdec92f162a Mon Sep 17 00:00:00 2001 From: Wasserstoff <41219647+GermanHydrogen@users.noreply.github.com> Date: Tue, 20 Feb 2024 10:02:08 +0100 Subject: [PATCH] NStarOffsets testing & refactor (#338) * Added some tests to nstar offsets * Added missing basic unit tests to nstar offsets * Removed unnecessary code * Seperated theausssian fit from nstar oss * Fixed docstrings * Fixed default fit result on fit failure * Removed duplication in tests * Added box generator to seperate initial box definition from nstar class * Simplified remove bad sources * Added some clarifications to box generator * Integrated box generator with nstar * Refactored nstar offset calculation * Added missing testcase and None handling * Changed nstars max offset to pixels instead of arc seconds * Seperated the source filtering in box generator into a seperate image processor * Added box overlapping check to box generator * Simplified overlap checking in box generator * Changed background filter calculation to weber contrast * Added docstring to image source filter * Added comment to nstar docstring --- pyobs/images/processors/misc/__init__.py | 1 + .../processors/misc/image_source_filter.py | 128 ++++++ pyobs/images/processors/offsets/__init__.py | 2 +- pyobs/images/processors/offsets/nstar.py | 400 ------------------ .../processors/offsets/nstar/__init__.py | 0 .../offsets/nstar/_box_generator.py | 47 ++ .../offsets/nstar/_gaussian_fitter.py | 119 ++++++ .../images/processors/offsets/nstar/nstar.py | 135 ++++++ .../misc/test_image_source_filter.py | 67 +++ .../test_source_catalog.py | 0 .../processors/offsets/nstar/__init__.py | 0 .../offsets/nstar/test_box_generator.py | 62 +++ .../offsets/nstar/test_gaussianfitter.py | 45 ++ .../processors/offsets/nstar/test_nstar.py | 107 +++++ 14 files changed, 712 insertions(+), 401 deletions(-) create mode 100644 pyobs/images/processors/misc/image_source_filter.py delete mode 100644 pyobs/images/processors/offsets/nstar.py create mode 100644 pyobs/images/processors/offsets/nstar/__init__.py create mode 100644 pyobs/images/processors/offsets/nstar/_box_generator.py create mode 100644 pyobs/images/processors/offsets/nstar/_gaussian_fitter.py create mode 100644 pyobs/images/processors/offsets/nstar/nstar.py create mode 100644 tests/images/processors/misc/test_image_source_filter.py rename tests/images/processors/{detection => misc}/test_source_catalog.py (100%) create mode 100644 tests/images/processors/offsets/nstar/__init__.py create mode 100644 tests/images/processors/offsets/nstar/test_box_generator.py create mode 100644 tests/images/processors/offsets/nstar/test_gaussianfitter.py create mode 100644 tests/images/processors/offsets/nstar/test_nstar.py diff --git a/pyobs/images/processors/misc/__init__.py b/pyobs/images/processors/misc/__init__.py index 1b0965f8..a4cc8cd3 100644 --- a/pyobs/images/processors/misc/__init__.py +++ b/pyobs/images/processors/misc/__init__.py @@ -10,3 +10,4 @@ from .smooth import Smooth from .softbin import SoftBin from .broadcast import Broadcast +from .image_source_filter import ImageSourceFilter diff --git a/pyobs/images/processors/misc/image_source_filter.py b/pyobs/images/processors/misc/image_source_filter.py new file mode 100644 index 00000000..fea41b87 --- /dev/null +++ b/pyobs/images/processors/misc/image_source_filter.py @@ -0,0 +1,128 @@ +from copy import copy +from typing import Tuple, Any + +import numpy as np +from astropy.table import Table + +from pyobs.images import ImageProcessor, Image + + +class ImageSourceFilter(ImageProcessor): + def __init__(self, + min_dist_to_border: float, + num_stars: int, + min_pixels: int, + max_ellipticity: float = 0.4, + min_weber_contrast: float = 1.5, + max_saturation: int = 50000) -> None: + """ + Filters the source table after pysep detection has run + Args: + min_dist_to_border: Minimal distance to the image border + num_stars: Number of sources to take + min_pixels: Minimum required amount of pixels of a source + max_ellipticity: Maximum allowed ellipticity of a source + min_weber_contrast: Minimum required weber contrast of a source (relative to the background) + max_saturation: + """ + + super().__init__() + + self._min_dist_to_border = min_dist_to_border + self._num_stars = num_stars + self._min_pixels = min_pixels + self._max_ellipticity = max_ellipticity + self._min_weber_contrast = min_weber_contrast + self._max_saturation = max_saturation + + async def __call__(self, image: Image) -> Image: + working_image = copy(image) + sources_copy = working_image.catalog.copy() + + valid_sources = self.remove_sources_close_to_border(sources_copy, working_image.data.shape) + good_sources = self.remove_bad_sources(valid_sources) + selected_sources = self._select_brightest_sources(good_sources) + + working_image.catalog = selected_sources + + return working_image + + @staticmethod + def _fits2numpy(sources: Table) -> Table: + """Convert from FITS to numpy conventions for pixel coordinates.""" + for k in ["x", "y", "xmin", "xmax", "ymin", "ymax", "xpeak", "ypeak"]: + if k in sources.keys(): + sources[k] -= 1 + return sources + + def remove_sources_close_to_border(self, sources: Table, image_shape: Tuple[int, int]) -> Table: + """Remove table rows from sources when source is closer than given distance from border of image. + + Args: + sources: Input table. + image_shape: Shape of image. + + Returns: + Filtered table. + .""" + + width, height = image_shape + + x_dist_from_border = width / 2 - np.abs(sources["y"] - width / 2) + y_dist_from_border = height / 2 - np.abs(sources["x"] - height / 2) + + min_dist_from_border = np.minimum(x_dist_from_border, y_dist_from_border) + sources_result = sources[min_dist_from_border > self._min_dist_to_border] + + return sources_result + + def remove_bad_sources( + self, sources: Table, + ) -> Table: + """Remove bad sources from table. + + Args: + sources: Input table. + + Returns: + Filtered table. + """ + + saturated_sources = sources["peak"] >= self._max_saturation + + small_sources = sources["tnpix"] < self._min_pixels + + tnpix_median = np.median(sources["tnpix"]) + tnpix_std = np.std(sources["tnpix"]) + large_sources = sources["tnpix"] > tnpix_median + 2 * tnpix_std + + elliptic_sources = sources["ellipticity"] > self._max_ellipticity + + background_sources = sources["background"] <= 0 + + low_contrast_sources = self._calc_weber_contrast(sources["peak"], sources["background"]) <= self._min_weber_contrast + + bad_sources = saturated_sources | small_sources | large_sources | elliptic_sources | background_sources | low_contrast_sources + + filtered_sources = sources[~bad_sources] # keep sources that are not bad + return filtered_sources + + @staticmethod + def _calc_weber_contrast(peak: np.ndarray[Any, Any], background: np.ndarray[Any, Any]) -> np.ndarray[Any, Any]: + return (peak - background)/background + + def _select_brightest_sources(self, sources: Table) -> Table: + """Select the N brightest sources from table. + + Args: + sources: Source table. + + Returns: + table containing the N brightest sources. + """ + + sources.sort("flux", reverse=True) + + if 0 < self._num_stars < len(sources): + sources = sources[:self._num_stars] + return sources diff --git a/pyobs/images/processors/offsets/__init__.py b/pyobs/images/processors/offsets/__init__.py index 1ff322cf..0437e335 100644 --- a/pyobs/images/processors/offsets/__init__.py +++ b/pyobs/images/processors/offsets/__init__.py @@ -6,7 +6,7 @@ from .offsets import Offsets from .astrometry import AstrometryOffsets from .brighteststar import BrightestStarOffsets -from .nstar import NStarOffsets +from .nstar.nstar import NStarOffsets from .projected import ProjectedOffsets from .fitsheader import FitsHeaderOffsets from .dummyoffsets import DummyOffsets diff --git a/pyobs/images/processors/offsets/nstar.py b/pyobs/images/processors/offsets/nstar.py deleted file mode 100644 index 33ce788b..00000000 --- a/pyobs/images/processors/offsets/nstar.py +++ /dev/null @@ -1,400 +0,0 @@ -import logging -from typing import Tuple, List, Union, Dict, Any, Optional -import numpy as np -import pandas as pd -from numpy.typing import NDArray -from scipy import signal, optimize -from astropy.nddata import NDData -from astropy.table import Table, Column -import photutils - -from pyobs.images import Image, ImageProcessor -from pyobs.mixins.pipeline import PipelineMixin -from pyobs.images.meta import PixelOffsets -from .offsets import Offsets - -log = logging.getLogger(__name__) - - -class CorrelationMaxCloseToBorderError(Exception): - pass - - -class NStarOffsets(Offsets, PipelineMixin): - """An offset-calculation method based on comparing 2D images of the surroundings of a variable number of stars.""" - - def __init__( - self, - num_stars: int = 10, - max_offset: float = 4.0, - min_pixels: int = 3, - min_sources: int = 1, - pipeline: Optional[List[Union[Dict[str, Any], ImageProcessor]]] = None, - **kwargs: Any, - ): - """Initializes a new auto guiding system. - - Requires pyobs.images.processors.detection.SepSourceDetection and - pyobs.images.processors.photometry.SepPhotometry to be run on the image beforehand. - - Args: - num_stars: maximum number of stars to use to calculate offset from boxes around them - max_offset: the maximal expected offset in arc seconds. Determines the size of boxes - around stars. - min_pixels: minimum required number of pixels above threshold for source to be - used for offset calculation. - min_sources: Minimum required number of sources in image. - pipeline: Pipeline to be used for first image in series. - """ - Offsets.__init__(self, **kwargs) - PipelineMixin.__init__(self, pipeline) - - # store - self.num_stars = num_stars - self.max_offset = max_offset - self.min_pixels = min_pixels - self.min_sources = min_sources - self.ref_boxes: List[Any] = [] - - async def reset(self) -> None: - """Resets guiding.""" - log.info("Reset auto-guiding.") - self.ref_boxes = [] - - async def __call__(self, image: Image) -> Image: - """Processes an image and sets x/y pixel offset to reference in offset attribute. - - Args: - image: Image to process. - - Returns: - Original image. - - Raises: - ValueError: If offset could not be found. - """ - - # no reference image? - if len(self.ref_boxes) == 0: - log.info("Initialising nstar auto-guiding with new image...") - star_box_size = max(5, self._get_box_size(self.max_offset, image.pixel_scale)) - log.info(f"Choosing box size of {star_box_size} pixels.") - - # initialize reference image information - try: - # get boxes - self.ref_boxes = await self._boxes_from_ref(image, star_box_size) - - # reset and finish - image.set_meta(PixelOffsets(0.0, 0.0)) - return image - - except ValueError as e: - # didn't work - log.warning(f"Could not initialize reference image info due to exception '{e}'. Resetting...") - await self.reset() - if PixelOffsets in image.meta: - del image.meta[PixelOffsets] - self.offset = None, None - return image - - # process it - log.info("Perform auto-guiding on new image...") - offsets = self._calculate_offsets(image) - if offsets[0] is not None: - image.set_meta(PixelOffsets(offsets[0], offsets[1])) - return image - - @staticmethod - def _get_box_size(max_expected_offset_in_arcsec, pixel_scale) -> int: - # multiply by 4 to give enough space for fit of correlation around the peak on all sides - return int(4 * max_expected_offset_in_arcsec / pixel_scale if pixel_scale else 20) - - async def _boxes_from_ref(self, image: Image, star_box_size: int) -> List: - """Calculate the boxes around self.N_stars best sources in the image. - - Args: - image: Image to process - - Returns: - 2-tuple with - list of dimensions of boxes in "numpy" order: [0'th axis min, 0'th axis max, 1st axis min, 1st axis max] - list of images of those boxes - - Raises: - ValueError if not at least max(self.min_required_sources_in_image, self.N_stars) in filtered list of sources - """ - - # run pipeline on 1st image - img = await self.run_pipeline(image) - - # do photometry and get catalog - sources = self._fits2numpy(img.catalog) - - # filter sources - sources = self.remove_sources_close_to_border(sources, img.data.shape, star_box_size // 2 + 1) - sources = self.remove_bad_sources(sources) - self._check_sources_count(sources) - selected_sources = self._select_brightest_sources(self.num_stars, sources) - - # extract boxes - return photutils.psf.extract_stars( - NDData(img.data.astype(float)), selected_sources, size=star_box_size - ).all_stars - - @staticmethod - def _fits2numpy(sources: Table) -> Table: - """Convert from FITS to numpy conventions for pixel coordinates.""" - for k in ["x", "y", "xmin", "xmax", "ymin", "ymax", "xpeak", "ypeak"]: - if k in sources.keys(): - sources[k] -= 1 - return sources - - @staticmethod - def remove_sources_close_to_border(sources: Table, image_shape: tuple, min_dist) -> Table: - """Remove table rows from sources when source is closer than given distance from border of image. - - Args: - sources: Input table. - image_shape: Shape of image. - min_dist: Minimum distance from border in pixels. - - Returns: - Filtered table. - .""" - - # get shape - width, height = image_shape - - def min_distance_from_border(source: pd.DataFrame) -> List[float]: - # calculate the minimum distance of source to any image border (across x and y) - return [ - min(x, y) - for x, y in zip( - width / 2 - np.abs(source["y"] - width / 2), height / 2 - np.abs(source["x"] - height / 2) - ) - ] - - sources.add_column(Column(name="min_dist", data=min_distance_from_border(sources))) - sources.sort("min_dist") - - sources_result = sources[np.where(sources["min_dist"] > min_dist)] - return sources_result - - def remove_bad_sources( - self, sources: Table, max_ellipticity: float = 0.4, min_bkg_factor: float = 1.5, saturation: int = 50000 - ) -> Table: - """Remove bad sources from table. - - Args: - sources: Input table. - max_ellipticity: Maximum ellipticity. - min_bkg_factor: Minimum factor above local background. - saturation: Saturation level. - - Returns: - Filtered table. - """ - - # remove saturated sources - sources = sources[sources["peak"] < saturation] - - # remove small sources - sources = sources[np.where(sources["tnpix"] >= self.min_pixels)] - - # remove large sources - tnpix_median = np.median(sources["tnpix"]) - tnpix_std = np.std(sources["tnpix"]) - sources = sources[np.where(sources["tnpix"] <= tnpix_median + 2 * tnpix_std)] - - # remove highly elliptic sources - sources.sort("ellipticity") - sources = sources[np.where(sources["ellipticity"] <= max_ellipticity)] - - # remove sources with background <= 0 - sources = sources[np.where(sources["background"] > 0)] - - # remove sources with low contrast to background - sources = sources[np.where((sources["peak"] + sources["background"]) / sources["background"] > min_bkg_factor)] - return sources - - @staticmethod - def _select_brightest_sources(num_stars: int, sources: Table) -> Table: - """Select N brightest sources from table. - - Args: - num_stars: Maximum number of stars to select. - sources: Source table. - - Returns: - New table containing N brightest sources. - """ - - # sort by flux. - sources.sort("flux") - sources.reverse() - - # extract - if 0 < num_stars < len(sources): - sources = sources[:num_stars] - return sources - - def _check_sources_count(self, sources: Table) -> None: - """Check if enough sources in table. - - Args: - sources: astropy table of sources to check. - - Returns: - None - - Raises: - ValueError if not at least self.min_sources in sources table - - """ - n_required_sources = self.min_sources - if len(sources) < n_required_sources: - raise ValueError(f"Only {len(sources)} source(s) in image, but at least {n_required_sources} required.") - - def _calculate_offsets(self, image: Image) -> Tuple[Optional[float], Optional[float]]: - """Calculate offsets of given image to ref image for every star. - - Args: - image: Image to calculate offset for. - - Returns: - Offset tuple. - """ - - # data? - if image.safe_data is None: - return None, None - - # calculate offset for each star - offsets = [] - for box in self.ref_boxes: - # get box dimensions - box_ymin, box_ymax = box.origin[1], box.origin[1] + box.data.shape[0] - box_xmin, box_xmax = box.origin[0], box.origin[0] + box.data.shape[1] - - # extract box image - current_boxed_image = image.data[box_ymin:box_ymax, box_xmin:box_xmax].astype(float) - - # correlate - corr = signal.correlate2d(current_boxed_image, box.data, mode="same", boundary="wrap") - - try: - offset = self._offsets_from_corr(corr) - offsets.append(offset) - except Exception as e: - log.info(f"Exception '{e}' caught. Ignoring this star.") - pass - - if len(offsets) == 0: - log.info(f"All {self.num_stars} fits on boxed star correlations failed.") - return None, None - offsets_np = np.array(offsets) - return float(np.mean(offsets_np[:, 0])), float(np.mean(offsets_np[:, 1])) - - @staticmethod - def _gauss2d( - x: NDArray[float], a: float, b: float, x0: float, y0: float, sigma_x: float, sigma_y: float - ) -> NDArray[float]: - """2D Gaussian function.""" - return a + b * np.exp(-((x[0] - x0) ** 2) / (2 * sigma_x**2) - (x[1] - y0) ** 2 / (2 * sigma_y**2)) - - def _offsets_from_corr(self, corr: NDArray[float]) -> Tuple[float, float]: - """Fit 2d correlation data with a 2d gaussian + constant offset. - raise CorrelationMaxCloseToBorderError if the correlation maximum is not well separated from border.""" - - # check if maximum of correlation is too close to border - self._check_corr_border(corr) - - # get x,y positions array corresponding to the independent variable values of the correlation - x, y = NStarOffsets._corr_grid(corr) - - # shape data as needed by R^2 -> R scipy curve_fit - xdata = np.vstack((x.ravel(), y.ravel())) - ydata = corr.ravel() - - # estimate initial parameter values - # constant offset of 2d gaussian - a = np.min(corr) - - # height of 2d gaussian - b = np.max(corr) - a - - # gaussian peak position (estimate from maximum pixel position in correlation) - max_index = np.array(np.unravel_index(np.argmax(corr), corr.shape)) - x0, y0 = x[tuple(max_index)], y[tuple(max_index)] - - # estimate width of 2d gaussian as radius of area with values above half maximum - half_max = np.max(corr - a) / 2 + a - - # sum over binary array - greater_than_half_max_area = np.sum(corr >= half_max) - sigma_x = np.sqrt(greater_than_half_max_area / np.pi) - sigma_y = sigma_x - - # initial value list - p0 = [a, b, x0, y0, sigma_x, sigma_y] - bounds = ( - [-np.inf, -np.inf, x0 - sigma_x, y0 - sigma_y, 0, 0], - [np.inf, np.inf, x0 + sigma_x, y0 + sigma_y, np.inf, np.inf], - ) - - # only use data points that clearly belong to peak to avoid border effects - # mask_value_above_background = ydata > -1e5 # a + .1*b - mask_circle_around_peak = (x.ravel() - x0) ** 2 + (y.ravel() - y0) ** 2 < 4 * (sigma_x**2 + sigma_y**2) / 2 - mask = mask_circle_around_peak - ydata_restricted = ydata[mask] - xdata_restricted = xdata[:, mask] - - # do fit - try: - popt, pcov = optimize.curve_fit(self._gauss2d, xdata_restricted, ydata_restricted, p0, bounds=bounds) - except Exception as e: - # if fit fails return max pixel - log.info("Returning pixel position with maximal value in correlation.") - idx = np.unravel_index(np.argmax(corr), corr.shape) - return float(idx[0]), float(idx[1]) - - # check quality of fit - median_squared_relative_residue_threshold = 1e-1 - fit_ydata_restricted = self._gauss2d(xdata_restricted, *popt) - square_rel_res = np.square((fit_ydata_restricted - ydata_restricted) / fit_ydata_restricted) - median_squared_rel_res = np.median(np.square(square_rel_res)) - - if median_squared_rel_res > median_squared_relative_residue_threshold: - raise Exception( - f"Bad fit with median squared relative residue = {median_squared_rel_res}" - f" vs allowed value of {median_squared_relative_residue_threshold}" - ) - - return popt[2], popt[3] - - @staticmethod - def _corr_grid(corr: NDArray[float]) -> NDArray[float]: - """Create x/y grid for given 2D correlation.""" - xs = np.arange(-corr.shape[0] / 2, corr.shape[0] / 2) + 0.5 - ys = np.arange(-corr.shape[1] / 2, corr.shape[1] / 2) + 0.5 - return np.meshgrid(xs, ys) - - @staticmethod - def _check_corr_border(corr: NDArray[float]) -> None: - """Check whether maximum of correlation is too close to border.""" - - corr_size = corr.shape[0] - x, y = NStarOffsets._corr_grid(corr) - - max_index = np.array(np.unravel_index(np.argmax(corr), corr.shape)) - x0, y0 = x[tuple(max_index)], y[tuple(max_index)] - - if x0 < -corr_size / 4 or x0 > corr_size / 4 or y0 < -corr_size / 4 or y0 > corr_size / 4: - raise CorrelationMaxCloseToBorderError( - "Maximum of correlation is outside center half of axes. " - "This means that either the given image data is bad, or the offset is larger than expected." - ) - - -__all__ = ["NStarOffsets"] diff --git a/pyobs/images/processors/offsets/nstar/__init__.py b/pyobs/images/processors/offsets/nstar/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/pyobs/images/processors/offsets/nstar/_box_generator.py b/pyobs/images/processors/offsets/nstar/_box_generator.py new file mode 100644 index 00000000..94ab08c3 --- /dev/null +++ b/pyobs/images/processors/offsets/nstar/_box_generator.py @@ -0,0 +1,47 @@ +import itertools +from typing import List, Any + +import numpy as np +import photutils +from astropy.nddata import NDData +from astropy.table import Table +from photutils.psf import EPSFStar + +from pyobs.images import Image + + +class _BoxGenerator(object): + def __init__(self, max_offset: float, min_sources: int) -> None: + self._box_size = self._max_offset_to_box_size(max_offset) + self._min_sources = min_sources + + @staticmethod + def _max_offset_to_box_size(max_offset: float) -> int: + box_size = 2 * max_offset + 1 # photutils.psf.extract_stars only accepts uneven box sizes + return int(box_size) + + def __call__(self, image: Image) -> List[EPSFStar]: + sources = image.catalog + self._check_sources_count(sources) + + # extract boxes + boxes = photutils.psf.extract_stars( + NDData(image.data.astype(float)), sources, size=self._box_size + ).all_stars + + self._check_overlapping_boxes(boxes) + return boxes + + def _check_sources_count(self, sources: Table) -> None: + if len(sources) < self._min_sources: + raise ValueError(f"Only {len(sources)} source(s) in image, but at least {self._min_sources} required.") + + def _check_overlapping_boxes(self, boxes: List[EPSFStar]) -> None: + for (box_1, box_2) in itertools.combinations(boxes, 2): + self._check_overlapping_box_pair(box_1.center, box_2.center) + + def _check_overlapping_box_pair(self, box_center_1: np.ndarray[Any, Any], box_center_2: np.ndarray[Any, Any]) -> None: + dist_2d = np.abs(np.subtract(box_center_1, box_center_2)) + + if dist_2d[0] < self._box_size / 2 or dist_2d[1] < self._box_size / 2: + raise ValueError("Boxes are overlapping!") diff --git a/pyobs/images/processors/offsets/nstar/_gaussian_fitter.py b/pyobs/images/processors/offsets/nstar/_gaussian_fitter.py new file mode 100644 index 00000000..996a110d --- /dev/null +++ b/pyobs/images/processors/offsets/nstar/_gaussian_fitter.py @@ -0,0 +1,119 @@ +import logging +from typing import Tuple, Any + +import numpy as np +from scipy import optimize +from scipy.optimize import OptimizeWarning + +log = logging.getLogger(__name__) + + +class GaussianFitter(object): + + @staticmethod + def offsets_from_corr(corr: np.ndarray[float]) -> Tuple[float, float]: + """Fit 2d correlation data with a 2d gaussian + constant offset. + raise CorrelationMaxCloseToBorderError if the correlation maximum is not well separated from border.""" + + xdata_restricted, ydata_restricted, p0, bounds = GaussianFitter._init_fit(corr) + + try: + fit_result, _ = optimize.curve_fit(GaussianFitter._gauss2d, xdata_restricted, ydata_restricted, p0, bounds=bounds) + except (ValueError, RuntimeError, OptimizeWarning): + + log.info("Returning pixel position with maximal value in correlation.") + return p0[2], p0[3] + + GaussianFitter._check_fit_quality(xdata_restricted, ydata_restricted, fit_result) + + return fit_result[2], fit_result[3] + + @staticmethod + def _init_fit(corr: np.ndarray[float]) -> tuple[ + np.ndarray[Any, np.dtype[Any]], np.ndarray[Any, np.dtype[float]], tuple[float, float, float, float, float, float], tuple[ + tuple[float, float, float, float, int, int], tuple[float, float, float, float, float, float]]]: + # get x,y positions array corresponding to the independent variable values of the correlation + x, y = GaussianFitter._corr_grid(corr) + + # gaussian peak position (estimate from maximum pixel position in correlation) + max_index = np.array(np.unravel_index(np.argmax(corr), corr.shape)) + peak_x, peak_y = x[tuple(max_index)], y[tuple(max_index)] + + # check if maximum of correlation is too close to border + GaussianFitter._check_peak_border_distance(corr, peak_x, peak_y) + + # estimate initial parameter values + # constant offset of 2d gaussian + background = np.min(corr) + + # height of 2d gaussian + peak_height = np.max(corr) - background + + # estimate width of 2d gaussian as radius of area with values above half maximum + half_max = np.max(corr - background) / 2 + background + + # sum over binary array + greater_than_half_max_area = np.sum(corr >= half_max) + sigma_x = np.sqrt(greater_than_half_max_area / np.pi) + sigma_y = sigma_x + + # initial value list + p0 = (background, peak_height, peak_x, peak_y, sigma_x, sigma_y) + bounds = ( + (-np.inf, -np.inf, peak_x - sigma_x, peak_y - sigma_y, 0, 0), + (np.inf, np.inf, peak_x + sigma_x, peak_y + sigma_y, np.inf, np.inf), + ) + + # shape data as needed by R^2 -> R scipy curve_fit + xdata = np.vstack((x.ravel(), y.ravel())) + ydata = corr.ravel() + + # only use data points that clearly belong to peak to avoid border effects + # mask_value_above_background = ydata > -1e5 # a + .1*b + mask_circle_around_peak = (x.ravel() - peak_x) ** 2 + (y.ravel() - peak_y) ** 2 < 4 * ( + sigma_x ** 2 + sigma_y ** 2) / 2 + mask = mask_circle_around_peak + ydata_restricted = ydata[mask] + xdata_restricted = xdata[:, mask] + + return xdata_restricted, ydata_restricted, p0, bounds + + @staticmethod + def _corr_grid(corr: np.ndarray[float]) -> np.ndarray[float]: + """Create x/y grid for given 2D correlation.""" + xs = np.arange(-corr.shape[0] / 2, corr.shape[0] / 2) + 0.5 + ys = np.arange(-corr.shape[1] / 2, corr.shape[1] / 2) + 0.5 + return np.meshgrid(xs, ys) + + @staticmethod + def _check_peak_border_distance(corr: np.ndarray[float], peak_x: float, peak_y: float) -> None: + """Check whether maximum of correlation is too close to border.""" + + corr_size = corr.shape[0] + + if peak_x < -corr_size / 4 or peak_x > corr_size / 4 or peak_y < -corr_size / 4 or peak_y > corr_size / 4: + raise Exception( + "Maximum of correlation is outside center half of axes. " + "This means that either the given image data is bad, or the offset is larger than expected." + ) + + @staticmethod + def _gauss2d( + x: np.ndarray[float], a: float, b: float, x0: float, y0: float, sigma_x: float, sigma_y: float + ) -> np.ndarray[float]: + """2D Gaussian function.""" + return a + b * np.exp(-((x[0] - x0) ** 2) / (2 * sigma_x ** 2) - (x[1] - y0) ** 2 / (2 * sigma_y ** 2)) + + @staticmethod + def _check_fit_quality(xdata_restricted: np.ndarray[float], ydata_restricted: np.ndarray[float], + popt: Tuple[float, float, float, float, float, float]) -> None: + median_squared_relative_residue_threshold = 1e-1 + fit_ydata_restricted = GaussianFitter._gauss2d(xdata_restricted, *popt) + square_rel_res = np.square((fit_ydata_restricted - ydata_restricted) / fit_ydata_restricted) + median_squared_rel_res = np.median(np.square(square_rel_res)) + + if median_squared_rel_res > median_squared_relative_residue_threshold: + raise Exception( + f"Bad fit with median squared relative residue = {median_squared_rel_res}" + f" vs allowed value of {median_squared_relative_residue_threshold}" + ) \ No newline at end of file diff --git a/pyobs/images/processors/offsets/nstar/nstar.py b/pyobs/images/processors/offsets/nstar/nstar.py new file mode 100644 index 00000000..f0bbdb1b --- /dev/null +++ b/pyobs/images/processors/offsets/nstar/nstar.py @@ -0,0 +1,135 @@ +import logging +from typing import Tuple, List, Union, Dict, Any, Optional + +import numpy as np +from photutils.psf import EPSFStar +from scipy import signal + +from pyobs.images import Image, ImageProcessor +from pyobs.images.meta import PixelOffsets +from pyobs.images.processors.offsets.nstar._box_generator import _BoxGenerator +from pyobs.images.processors.offsets.nstar._gaussian_fitter import GaussianFitter +from pyobs.images.processors.offsets.offsets import Offsets +from pyobs.mixins.pipeline import PipelineMixin + +log = logging.getLogger(__name__) + + +class CorrelationMaxCloseToBorderError(Exception): + pass + + +class NStarOffsets(Offsets, PipelineMixin): + """An offset-calculation method based on comparing 2D images of the surroundings of a variable number of stars.""" + + def __init__( + self, + max_pixel_offset: float = 5.0, + min_sources: int = 1, + pipeline: Optional[List[Union[Dict[str, Any], ImageProcessor]]] = None, + **kwargs: Any, + ): + """Initializes an offset calculator. + + Args: + num_stars: maximum number of stars to use to calculate offset from boxes around them + max_pixel_offset: the maximal expected pixel offset. Determines the size of boxes around stars. + min_pixels: minimum required number of pixels for a source to be used for offset calculation. + min_sources: Minimum required number of sources in image. + pipeline: Pipeline to be used for first image in series. (Should use SEP detection and source filtering) + """ + Offsets.__init__(self, **kwargs) + PipelineMixin.__init__(self, pipeline) + + # store + self._box_size = max_pixel_offset + self._box_generator = _BoxGenerator(max_pixel_offset, min_sources=min_sources) + self.ref_boxes: List[EPSFStar] = [] + + async def reset(self) -> None: + """Resets guiding.""" + log.info("Reset auto-guiding.") + self.ref_boxes = [] + + async def __call__(self, image: Image) -> Image: + """Processes an image and sets x/y pixel offset to reference in offset attribute. + + Args: + image: Image to process. + + Returns: + Original image. + + Raises: + ValueError: If offset could not be found. + """ + + output_image = image.copy() + + if self._boxes_initialized(): + log.info("Initialising NStar offsets with new image...") + try: + await self._init_boxes(output_image) + output_image.set_meta(PixelOffsets(0.0, 0.0)) + except ValueError as e: + log.warning(f"Could not initialize reference image info due to exception '{e}'. Resetting...") + await self.reset() + if PixelOffsets in output_image.meta: + del output_image.meta[PixelOffsets] + + else: + log.info("Perform auto-guiding on new image...") + offsets = self._calculate_offsets(output_image) + if offsets[0] is not None and offsets[1] is not None: + output_image.set_meta(PixelOffsets(offsets[0], offsets[1])) + + return output_image + + def _boxes_initialized(self) -> bool: + return len(self.ref_boxes) == 0 + + async def _init_boxes(self, image: Image) -> None: + processed_image = await self.run_pipeline(image) + self.ref_boxes = self._box_generator(processed_image) + + def _calculate_offsets(self, image: Image) -> Tuple[Optional[float], Optional[float]]: + """Calculate offsets of given image to ref image for every star. + + Args: + image: Image to calculate offset for. + + Returns: + Offset in x and y dimension. + """ + + if (image_data := image.safe_data) is None: + return None, None + + offsets = np.fromiter( + filter( + lambda x: x is not None, + map(lambda x: NStarOffsets._calculate_star_offset(x, image_data), self.ref_boxes) + ), + np.dtype((float, 2)) + ) + + if len(offsets) == 0: + log.info(f"All {len(self.ref_boxes)} fits on boxed star correlations failed.") + return None, None + + return float(np.mean(offsets[:, 0])), float(np.mean(offsets[:, 1])) + + @staticmethod + def _calculate_star_offset(box: EPSFStar, image: np.ndarray) -> Optional[Tuple[float, float]]: + current_boxed_image = image[box.slices] + + corr = signal.correlate2d(current_boxed_image, box.data, mode="same", boundary="wrap") + + try: + return GaussianFitter.offsets_from_corr(corr) + except Exception as e: + log.info(f"Exception '{e}' caught. Ignoring this star.") + return None + + +__all__ = ["NStarOffsets"] diff --git a/tests/images/processors/misc/test_image_source_filter.py b/tests/images/processors/misc/test_image_source_filter.py new file mode 100644 index 00000000..338797e0 --- /dev/null +++ b/tests/images/processors/misc/test_image_source_filter.py @@ -0,0 +1,67 @@ +import numpy as np +import pytest +from astropy.table import Table + +from pyobs.images import Image +from pyobs.images.processors.misc import ImageSourceFilter + + +def test_fits2numpy() -> None: + table = Table() + for k in ["x", "y", "xmin", "xmax", "ymin", "ymax", "xpeak", "ypeak"]: + table[k] = [1] + + result = ImageSourceFilter._fits2numpy(table) + assert all(map(lambda x: x == 0, result.values())) + + +def test_remove_sources_close_to_border() -> None: + table = Table({"x": [1, 10, 19, 10, 10], "y": [10, 1, 10, 19, 10]}) + + source_filter = ImageSourceFilter(2, 10, 10, 10) + result = source_filter.remove_sources_close_to_border(table, (20, 20)) + np.testing.assert_array_equal(result["x"], [10]) + np.testing.assert_array_equal(result["y"], [10]) + + +def test_remove_bad_sources() -> None: + table = Table({ + "peak": [50000, 5001, 5002, 5003, 5004, 5005, 5006], + "tnpix": [10, 2, 100, 10, 10, 10, 10], + "ellipticity": [0.3, 0.3, 0.3, 0.5, 0.3, 0.3, 0.3], + "background": [100, 100, 100, 100, -1, 50000, 100] + }) + + result = ImageSourceFilter(10, 10, 3, 0.4).remove_bad_sources(table) + + np.testing.assert_array_equal(result["peak"], [5006]) + np.testing.assert_array_equal(result["tnpix"], [10]) + np.testing.assert_array_equal(result["ellipticity"], [0.3]) + np.testing.assert_array_equal(result["background"], [100]) + + +def test_select_brightest_sources() -> None: + table = Table({"flux": list(range(3))}) + + result = ImageSourceFilter(5.0, 2, 3, 2)._select_brightest_sources(table) + np.testing.assert_array_equal(result["flux"], list(reversed(range(1, 3)))) + + +@pytest.mark.asyncio +async def test_call() -> None: + sources = Table({ + "x": [10], + "y": [10], + "flux": [10], + "peak": [100], + "tnpix": [10], + "ellipticity": [0.0], + "background": [1.0] + }) + + image = Image(data=np.ones((20, 20)), catalog=sources) + + filter = ImageSourceFilter(2.0, 10, 3, 1) + + result = await filter(image) + assert result.catalog == sources diff --git a/tests/images/processors/detection/test_source_catalog.py b/tests/images/processors/misc/test_source_catalog.py similarity index 100% rename from tests/images/processors/detection/test_source_catalog.py rename to tests/images/processors/misc/test_source_catalog.py diff --git a/tests/images/processors/offsets/nstar/__init__.py b/tests/images/processors/offsets/nstar/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/tests/images/processors/offsets/nstar/test_box_generator.py b/tests/images/processors/offsets/nstar/test_box_generator.py new file mode 100644 index 00000000..03430c59 --- /dev/null +++ b/tests/images/processors/offsets/nstar/test_box_generator.py @@ -0,0 +1,62 @@ +from typing import Any + +import numpy as np +import pytest +from astropy.table import Table + +from pyobs.images import Image +from pyobs.images.processors.offsets.nstar._box_generator import _BoxGenerator + + +def test_check_sources_count() -> None: + table = Table() + with pytest.raises(ValueError): + _BoxGenerator(5.0, 10)._check_sources_count(table) + + +class MockBox: + def __init__(self, center: np.ndarray[Any, Any]): + self.center = center + + +def test_check_overlapping_boxes() -> None: + boxes = [MockBox(np.array([1, 1])), MockBox(np.array([4, 4])), MockBox(np.array([10, 10]))] + generator = _BoxGenerator(3.0, 1) + + with pytest.raises(ValueError): + generator._check_overlapping_boxes(boxes) + + +def test_check_overlapping_box_pair_valid() -> None: + generator = _BoxGenerator(3.0, 1) + assert generator._check_overlapping_box_pair([1, 1], [10, 10]) is None + + +def test_check_overlapping_box_pair_invalid() -> None: + generator = _BoxGenerator(3.0, 1) + with pytest.raises(ValueError): + generator._check_overlapping_box_pair([1, 1], [10, 4]) + + with pytest.raises(ValueError): + generator._check_overlapping_box_pair([1, 1], [4, 10]) + + + +@pytest.mark.asyncio +async def test_call() -> None: + sources = Table({ + "x": [10], + "y": [10], + "flux": [10], + "peak": [100], + "tnpix": [10], + "ellipticity": [0.0], + "background": [1.0] + }) + + image = Image(data=np.ones((20, 20)), catalog=sources) + + generator = _BoxGenerator(2.0, 1) + + result = generator(image) + np.testing.assert_array_equal(result[0], np.ones((5, 5))) \ No newline at end of file diff --git a/tests/images/processors/offsets/nstar/test_gaussianfitter.py b/tests/images/processors/offsets/nstar/test_gaussianfitter.py new file mode 100644 index 00000000..b4386e9a --- /dev/null +++ b/tests/images/processors/offsets/nstar/test_gaussianfitter.py @@ -0,0 +1,45 @@ +from unittest.mock import Mock + +import numpy as np +import pytest +import scipy.optimize + +from pyobs.images.processors.offsets.nstar._gaussian_fitter import GaussianFitter + + +@pytest.fixture() +def gaussian_data(): + return np.array([ + [ + GaussianFitter._gauss2d([x, y], 1, 2, 10, 10, 1, 1) for x in range(21) + ] for y in range(21) + ]) + + +def test_offsets_from_corr(gaussian_data) -> None: + result = GaussianFitter().offsets_from_corr(gaussian_data.astype(float)) + + np.testing.assert_array_almost_equal(result, (0.0, 0.0), 10) + + +def test_offsets_from_corr_err(gaussian_data) -> None: + scipy.optimize.curve_fit = Mock(side_effect=RuntimeError) + + result = GaussianFitter().offsets_from_corr(gaussian_data.astype(float)) + assert result == (0.0, 0.0) + + +def test_check_corr_border() -> None: + corr = np.zeros((10, 10)) + + with pytest.raises(Exception): + GaussianFitter._check_peak_border_distance(corr.astype(float), 4, 4) + + +def test_check_fit_quality() -> None: + xdata_restricted = np.array([[0.0], [0.0]]) + ydata_restricted = np.array([[0.0], [0.0]]) + popt = (0.0, 1.0, 0.0, 0.0, 1.0, 1.0) + + with pytest.raises(Exception): + GaussianFitter._check_fit_quality(xdata_restricted, ydata_restricted, popt) diff --git a/tests/images/processors/offsets/nstar/test_nstar.py b/tests/images/processors/offsets/nstar/test_nstar.py new file mode 100644 index 00000000..8dd1f9b6 --- /dev/null +++ b/tests/images/processors/offsets/nstar/test_nstar.py @@ -0,0 +1,107 @@ +import logging +from unittest.mock import Mock + +import numpy as np +import pytest +from photutils.psf import EPSFStar +from pytest_mock import MockerFixture + +from pyobs.images import Image +from pyobs.images.meta import PixelOffsets +from pyobs.images.processors.offsets import NStarOffsets +from pyobs.images.processors.offsets.nstar._gaussian_fitter import GaussianFitter + + +@pytest.mark.asyncio +async def test_reset() -> None: + offsets = NStarOffsets() + offsets.ref_boxes = [np.ones((2, 2))] + await offsets.reset() + + assert len(offsets.ref_boxes) == 0 + + +def test_calculate_offsets_invalid_data() -> None: + image = Image() + + offsets = NStarOffsets() + + assert offsets._calculate_offsets(image) == (None, None) + + +def test_calculate_offsets_invalid_offsets(caplog) -> None: + image = Image(data=np.zeros((1, 1))) + + offsets = NStarOffsets() + offsets.ref_boxes = [] + + with caplog.at_level(logging.INFO): + assert offsets._calculate_offsets(image) == (None, None) + + assert caplog.messages[0] == "All 0 fits on boxed star correlations failed." + + +def test_calculate_offsets() -> None: + data = np.array([ + [ + GaussianFitter._gauss2d([x, y], 1, 2, 10, 10, 1, 1) for x in range(21) + ] for y in range(21) + ]) + + offsets = NStarOffsets() + offsets.ref_boxes = [EPSFStar(data[7:14, 7:14], origin=(8, 8))] + + result = offsets._calculate_offsets(Image(data=data)) + np.testing.assert_almost_equal(result, (-1.0, -1.0), 2) + + +def test_calculate_star_offset_invalid(caplog) -> None: + GaussianFitter.offsets_from_corr = Mock(side_effect=Exception("Invalid")) + box = EPSFStar(np.ones((2, 2)), origin=(8, 8)) + image = np.ones((3, 3)) + + with caplog.at_level(logging.INFO): + assert NStarOffsets._calculate_star_offset(box, image) is None + + assert caplog.messages[0] == "Exception 'Invalid' caught. Ignoring this star." + + +@pytest.mark.asyncio +async def test_call_init() -> None: + image = Image() + offsets = NStarOffsets() + + boxes = [EPSFStar(np.ones((10, 10)), origin=(8, 8))] + offsets._box_generator = Mock(return_value=boxes) + + result = await offsets(image) + assert result.get_meta(PixelOffsets).dx == 0.0 + assert result.get_meta(PixelOffsets).dy == 0.0 + + assert offsets.ref_boxes == boxes + + +@pytest.mark.asyncio +async def test_call_invalid_init() -> None: + image = Image() + image.set_meta(PixelOffsets(1.0, 1.0)) + offsets = NStarOffsets() + offsets._box_generator = Mock(side_effect=ValueError) + + result = await offsets(image) + + assert not result.has_meta(PixelOffsets) + + assert offsets.ref_boxes == [] + + +@pytest.mark.asyncio +async def test_call(mocker: MockerFixture) -> None: + image = Image() + offsets = NStarOffsets() + offsets.ref_boxes = [EPSFStar(np.ones((10, 10)), origin=(8, 8))] + mocker.patch.object(offsets, "_calculate_offsets", return_value=(1.0, -1.0)) + + result = await offsets(image) + assert result.get_meta(PixelOffsets).dx == 1.0 + assert result.get_meta(PixelOffsets).dy == -1.0