-
Notifications
You must be signed in to change notification settings - Fork 3
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
NStarOffsets testing & refactor (#338)
* Added some tests to nstar offsets * Added missing basic unit tests to nstar offsets * Removed unnecessary code * Seperated theausssian fit from nstar oss * Fixed docstrings * Fixed default fit result on fit failure * Removed duplication in tests * Added box generator to seperate initial box definition from nstar class * Simplified remove bad sources * Added some clarifications to box generator * Integrated box generator with nstar * Refactored nstar offset calculation * Added missing testcase and None handling * Changed nstars max offset to pixels instead of arc seconds * Seperated the source filtering in box generator into a seperate image processor * Added box overlapping check to box generator * Simplified overlap checking in box generator * Changed background filter calculation to weber contrast * Added docstring to image source filter * Added comment to nstar docstring
1 parent
8fb4466
commit 40d49f9
Showing
14 changed files
with
712 additions
and
401 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,128 @@ | ||
from copy import copy | ||
from typing import Tuple, Any | ||
|
||
import numpy as np | ||
from astropy.table import Table | ||
|
||
from pyobs.images import ImageProcessor, Image | ||
|
||
|
||
class ImageSourceFilter(ImageProcessor): | ||
def __init__(self, | ||
min_dist_to_border: float, | ||
num_stars: int, | ||
min_pixels: int, | ||
max_ellipticity: float = 0.4, | ||
min_weber_contrast: float = 1.5, | ||
max_saturation: int = 50000) -> None: | ||
""" | ||
Filters the source table after pysep detection has run | ||
Args: | ||
min_dist_to_border: Minimal distance to the image border | ||
num_stars: Number of sources to take | ||
min_pixels: Minimum required amount of pixels of a source | ||
max_ellipticity: Maximum allowed ellipticity of a source | ||
min_weber_contrast: Minimum required weber contrast of a source (relative to the background) | ||
max_saturation: | ||
""" | ||
|
||
super().__init__() | ||
|
||
self._min_dist_to_border = min_dist_to_border | ||
self._num_stars = num_stars | ||
self._min_pixels = min_pixels | ||
self._max_ellipticity = max_ellipticity | ||
self._min_weber_contrast = min_weber_contrast | ||
self._max_saturation = max_saturation | ||
|
||
async def __call__(self, image: Image) -> Image: | ||
working_image = copy(image) | ||
sources_copy = working_image.catalog.copy() | ||
|
||
valid_sources = self.remove_sources_close_to_border(sources_copy, working_image.data.shape) | ||
good_sources = self.remove_bad_sources(valid_sources) | ||
selected_sources = self._select_brightest_sources(good_sources) | ||
|
||
working_image.catalog = selected_sources | ||
|
||
return working_image | ||
|
||
@staticmethod | ||
def _fits2numpy(sources: Table) -> Table: | ||
"""Convert from FITS to numpy conventions for pixel coordinates.""" | ||
for k in ["x", "y", "xmin", "xmax", "ymin", "ymax", "xpeak", "ypeak"]: | ||
if k in sources.keys(): | ||
sources[k] -= 1 | ||
return sources | ||
|
||
def remove_sources_close_to_border(self, sources: Table, image_shape: Tuple[int, int]) -> Table: | ||
"""Remove table rows from sources when source is closer than given distance from border of image. | ||
Args: | ||
sources: Input table. | ||
image_shape: Shape of image. | ||
Returns: | ||
Filtered table. | ||
.""" | ||
|
||
width, height = image_shape | ||
|
||
x_dist_from_border = width / 2 - np.abs(sources["y"] - width / 2) | ||
y_dist_from_border = height / 2 - np.abs(sources["x"] - height / 2) | ||
|
||
min_dist_from_border = np.minimum(x_dist_from_border, y_dist_from_border) | ||
sources_result = sources[min_dist_from_border > self._min_dist_to_border] | ||
|
||
return sources_result | ||
|
||
def remove_bad_sources( | ||
self, sources: Table, | ||
) -> Table: | ||
"""Remove bad sources from table. | ||
Args: | ||
sources: Input table. | ||
Returns: | ||
Filtered table. | ||
""" | ||
|
||
saturated_sources = sources["peak"] >= self._max_saturation | ||
|
||
small_sources = sources["tnpix"] < self._min_pixels | ||
|
||
tnpix_median = np.median(sources["tnpix"]) | ||
tnpix_std = np.std(sources["tnpix"]) | ||
large_sources = sources["tnpix"] > tnpix_median + 2 * tnpix_std | ||
|
||
elliptic_sources = sources["ellipticity"] > self._max_ellipticity | ||
|
||
background_sources = sources["background"] <= 0 | ||
|
||
low_contrast_sources = self._calc_weber_contrast(sources["peak"], sources["background"]) <= self._min_weber_contrast | ||
|
||
bad_sources = saturated_sources | small_sources | large_sources | elliptic_sources | background_sources | low_contrast_sources | ||
|
||
filtered_sources = sources[~bad_sources] # keep sources that are not bad | ||
return filtered_sources | ||
|
||
@staticmethod | ||
def _calc_weber_contrast(peak: np.ndarray[Any, Any], background: np.ndarray[Any, Any]) -> np.ndarray[Any, Any]: | ||
return (peak - background)/background | ||
|
||
def _select_brightest_sources(self, sources: Table) -> Table: | ||
"""Select the N brightest sources from table. | ||
Args: | ||
sources: Source table. | ||
Returns: | ||
table containing the N brightest sources. | ||
""" | ||
|
||
sources.sort("flux", reverse=True) | ||
|
||
if 0 < self._num_stars < len(sources): | ||
sources = sources[:self._num_stars] | ||
return sources |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file was deleted.
Oops, something went wrong.
Empty file.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,47 @@ | ||
import itertools | ||
from typing import List, Any | ||
|
||
import numpy as np | ||
import photutils | ||
from astropy.nddata import NDData | ||
from astropy.table import Table | ||
from photutils.psf import EPSFStar | ||
|
||
from pyobs.images import Image | ||
|
||
|
||
class _BoxGenerator(object): | ||
def __init__(self, max_offset: float, min_sources: int) -> None: | ||
self._box_size = self._max_offset_to_box_size(max_offset) | ||
self._min_sources = min_sources | ||
|
||
@staticmethod | ||
def _max_offset_to_box_size(max_offset: float) -> int: | ||
box_size = 2 * max_offset + 1 # photutils.psf.extract_stars only accepts uneven box sizes | ||
return int(box_size) | ||
|
||
def __call__(self, image: Image) -> List[EPSFStar]: | ||
sources = image.catalog | ||
self._check_sources_count(sources) | ||
|
||
# extract boxes | ||
boxes = photutils.psf.extract_stars( | ||
NDData(image.data.astype(float)), sources, size=self._box_size | ||
).all_stars | ||
|
||
self._check_overlapping_boxes(boxes) | ||
return boxes | ||
|
||
def _check_sources_count(self, sources: Table) -> None: | ||
if len(sources) < self._min_sources: | ||
raise ValueError(f"Only {len(sources)} source(s) in image, but at least {self._min_sources} required.") | ||
|
||
def _check_overlapping_boxes(self, boxes: List[EPSFStar]) -> None: | ||
for (box_1, box_2) in itertools.combinations(boxes, 2): | ||
self._check_overlapping_box_pair(box_1.center, box_2.center) | ||
|
||
def _check_overlapping_box_pair(self, box_center_1: np.ndarray[Any, Any], box_center_2: np.ndarray[Any, Any]) -> None: | ||
dist_2d = np.abs(np.subtract(box_center_1, box_center_2)) | ||
|
||
if dist_2d[0] < self._box_size / 2 or dist_2d[1] < self._box_size / 2: | ||
raise ValueError("Boxes are overlapping!") |
119 changes: 119 additions & 0 deletions
119
pyobs/images/processors/offsets/nstar/_gaussian_fitter.py
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,119 @@ | ||
import logging | ||
from typing import Tuple, Any | ||
|
||
import numpy as np | ||
from scipy import optimize | ||
from scipy.optimize import OptimizeWarning | ||
|
||
log = logging.getLogger(__name__) | ||
|
||
|
||
class GaussianFitter(object): | ||
|
||
@staticmethod | ||
def offsets_from_corr(corr: np.ndarray[float]) -> Tuple[float, float]: | ||
"""Fit 2d correlation data with a 2d gaussian + constant offset. | ||
raise CorrelationMaxCloseToBorderError if the correlation maximum is not well separated from border.""" | ||
|
||
xdata_restricted, ydata_restricted, p0, bounds = GaussianFitter._init_fit(corr) | ||
|
||
try: | ||
fit_result, _ = optimize.curve_fit(GaussianFitter._gauss2d, xdata_restricted, ydata_restricted, p0, bounds=bounds) | ||
except (ValueError, RuntimeError, OptimizeWarning): | ||
|
||
log.info("Returning pixel position with maximal value in correlation.") | ||
return p0[2], p0[3] | ||
|
||
GaussianFitter._check_fit_quality(xdata_restricted, ydata_restricted, fit_result) | ||
|
||
return fit_result[2], fit_result[3] | ||
|
||
@staticmethod | ||
def _init_fit(corr: np.ndarray[float]) -> tuple[ | ||
np.ndarray[Any, np.dtype[Any]], np.ndarray[Any, np.dtype[float]], tuple[float, float, float, float, float, float], tuple[ | ||
tuple[float, float, float, float, int, int], tuple[float, float, float, float, float, float]]]: | ||
# get x,y positions array corresponding to the independent variable values of the correlation | ||
x, y = GaussianFitter._corr_grid(corr) | ||
|
||
# gaussian peak position (estimate from maximum pixel position in correlation) | ||
max_index = np.array(np.unravel_index(np.argmax(corr), corr.shape)) | ||
peak_x, peak_y = x[tuple(max_index)], y[tuple(max_index)] | ||
|
||
# check if maximum of correlation is too close to border | ||
GaussianFitter._check_peak_border_distance(corr, peak_x, peak_y) | ||
|
||
# estimate initial parameter values | ||
# constant offset of 2d gaussian | ||
background = np.min(corr) | ||
|
||
# height of 2d gaussian | ||
peak_height = np.max(corr) - background | ||
|
||
# estimate width of 2d gaussian as radius of area with values above half maximum | ||
half_max = np.max(corr - background) / 2 + background | ||
|
||
# sum over binary array | ||
greater_than_half_max_area = np.sum(corr >= half_max) | ||
sigma_x = np.sqrt(greater_than_half_max_area / np.pi) | ||
sigma_y = sigma_x | ||
|
||
# initial value list | ||
p0 = (background, peak_height, peak_x, peak_y, sigma_x, sigma_y) | ||
bounds = ( | ||
(-np.inf, -np.inf, peak_x - sigma_x, peak_y - sigma_y, 0, 0), | ||
(np.inf, np.inf, peak_x + sigma_x, peak_y + sigma_y, np.inf, np.inf), | ||
) | ||
|
||
# shape data as needed by R^2 -> R scipy curve_fit | ||
xdata = np.vstack((x.ravel(), y.ravel())) | ||
ydata = corr.ravel() | ||
|
||
# only use data points that clearly belong to peak to avoid border effects | ||
# mask_value_above_background = ydata > -1e5 # a + .1*b | ||
mask_circle_around_peak = (x.ravel() - peak_x) ** 2 + (y.ravel() - peak_y) ** 2 < 4 * ( | ||
sigma_x ** 2 + sigma_y ** 2) / 2 | ||
mask = mask_circle_around_peak | ||
ydata_restricted = ydata[mask] | ||
xdata_restricted = xdata[:, mask] | ||
|
||
return xdata_restricted, ydata_restricted, p0, bounds | ||
|
||
@staticmethod | ||
def _corr_grid(corr: np.ndarray[float]) -> np.ndarray[float]: | ||
"""Create x/y grid for given 2D correlation.""" | ||
xs = np.arange(-corr.shape[0] / 2, corr.shape[0] / 2) + 0.5 | ||
ys = np.arange(-corr.shape[1] / 2, corr.shape[1] / 2) + 0.5 | ||
return np.meshgrid(xs, ys) | ||
|
||
@staticmethod | ||
def _check_peak_border_distance(corr: np.ndarray[float], peak_x: float, peak_y: float) -> None: | ||
"""Check whether maximum of correlation is too close to border.""" | ||
|
||
corr_size = corr.shape[0] | ||
|
||
if peak_x < -corr_size / 4 or peak_x > corr_size / 4 or peak_y < -corr_size / 4 or peak_y > corr_size / 4: | ||
raise Exception( | ||
"Maximum of correlation is outside center half of axes. " | ||
"This means that either the given image data is bad, or the offset is larger than expected." | ||
) | ||
|
||
@staticmethod | ||
def _gauss2d( | ||
x: np.ndarray[float], a: float, b: float, x0: float, y0: float, sigma_x: float, sigma_y: float | ||
) -> np.ndarray[float]: | ||
"""2D Gaussian function.""" | ||
return a + b * np.exp(-((x[0] - x0) ** 2) / (2 * sigma_x ** 2) - (x[1] - y0) ** 2 / (2 * sigma_y ** 2)) | ||
|
||
@staticmethod | ||
def _check_fit_quality(xdata_restricted: np.ndarray[float], ydata_restricted: np.ndarray[float], | ||
popt: Tuple[float, float, float, float, float, float]) -> None: | ||
median_squared_relative_residue_threshold = 1e-1 | ||
fit_ydata_restricted = GaussianFitter._gauss2d(xdata_restricted, *popt) | ||
square_rel_res = np.square((fit_ydata_restricted - ydata_restricted) / fit_ydata_restricted) | ||
median_squared_rel_res = np.median(np.square(square_rel_res)) | ||
|
||
if median_squared_rel_res > median_squared_relative_residue_threshold: | ||
raise Exception( | ||
f"Bad fit with median squared relative residue = {median_squared_rel_res}" | ||
f" vs allowed value of {median_squared_relative_residue_threshold}" | ||
) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,135 @@ | ||
import logging | ||
from typing import Tuple, List, Union, Dict, Any, Optional | ||
|
||
import numpy as np | ||
from photutils.psf import EPSFStar | ||
from scipy import signal | ||
|
||
from pyobs.images import Image, ImageProcessor | ||
from pyobs.images.meta import PixelOffsets | ||
from pyobs.images.processors.offsets.nstar._box_generator import _BoxGenerator | ||
from pyobs.images.processors.offsets.nstar._gaussian_fitter import GaussianFitter | ||
from pyobs.images.processors.offsets.offsets import Offsets | ||
from pyobs.mixins.pipeline import PipelineMixin | ||
|
||
log = logging.getLogger(__name__) | ||
|
||
|
||
class CorrelationMaxCloseToBorderError(Exception): | ||
pass | ||
|
||
|
||
class NStarOffsets(Offsets, PipelineMixin): | ||
"""An offset-calculation method based on comparing 2D images of the surroundings of a variable number of stars.""" | ||
|
||
def __init__( | ||
self, | ||
max_pixel_offset: float = 5.0, | ||
min_sources: int = 1, | ||
pipeline: Optional[List[Union[Dict[str, Any], ImageProcessor]]] = None, | ||
**kwargs: Any, | ||
): | ||
"""Initializes an offset calculator. | ||
Args: | ||
num_stars: maximum number of stars to use to calculate offset from boxes around them | ||
max_pixel_offset: the maximal expected pixel offset. Determines the size of boxes around stars. | ||
min_pixels: minimum required number of pixels for a source to be used for offset calculation. | ||
min_sources: Minimum required number of sources in image. | ||
pipeline: Pipeline to be used for first image in series. (Should use SEP detection and source filtering) | ||
""" | ||
Offsets.__init__(self, **kwargs) | ||
PipelineMixin.__init__(self, pipeline) | ||
|
||
# store | ||
self._box_size = max_pixel_offset | ||
self._box_generator = _BoxGenerator(max_pixel_offset, min_sources=min_sources) | ||
self.ref_boxes: List[EPSFStar] = [] | ||
|
||
async def reset(self) -> None: | ||
"""Resets guiding.""" | ||
log.info("Reset auto-guiding.") | ||
self.ref_boxes = [] | ||
|
||
async def __call__(self, image: Image) -> Image: | ||
"""Processes an image and sets x/y pixel offset to reference in offset attribute. | ||
Args: | ||
image: Image to process. | ||
Returns: | ||
Original image. | ||
Raises: | ||
ValueError: If offset could not be found. | ||
""" | ||
|
||
output_image = image.copy() | ||
|
||
if self._boxes_initialized(): | ||
log.info("Initialising NStar offsets with new image...") | ||
try: | ||
await self._init_boxes(output_image) | ||
output_image.set_meta(PixelOffsets(0.0, 0.0)) | ||
except ValueError as e: | ||
log.warning(f"Could not initialize reference image info due to exception '{e}'. Resetting...") | ||
await self.reset() | ||
if PixelOffsets in output_image.meta: | ||
del output_image.meta[PixelOffsets] | ||
|
||
else: | ||
log.info("Perform auto-guiding on new image...") | ||
offsets = self._calculate_offsets(output_image) | ||
if offsets[0] is not None and offsets[1] is not None: | ||
output_image.set_meta(PixelOffsets(offsets[0], offsets[1])) | ||
|
||
return output_image | ||
|
||
def _boxes_initialized(self) -> bool: | ||
return len(self.ref_boxes) == 0 | ||
|
||
async def _init_boxes(self, image: Image) -> None: | ||
processed_image = await self.run_pipeline(image) | ||
self.ref_boxes = self._box_generator(processed_image) | ||
|
||
def _calculate_offsets(self, image: Image) -> Tuple[Optional[float], Optional[float]]: | ||
"""Calculate offsets of given image to ref image for every star. | ||
Args: | ||
image: Image to calculate offset for. | ||
Returns: | ||
Offset in x and y dimension. | ||
""" | ||
|
||
if (image_data := image.safe_data) is None: | ||
return None, None | ||
|
||
offsets = np.fromiter( | ||
filter( | ||
lambda x: x is not None, | ||
map(lambda x: NStarOffsets._calculate_star_offset(x, image_data), self.ref_boxes) | ||
), | ||
np.dtype((float, 2)) | ||
) | ||
|
||
if len(offsets) == 0: | ||
log.info(f"All {len(self.ref_boxes)} fits on boxed star correlations failed.") | ||
return None, None | ||
|
||
return float(np.mean(offsets[:, 0])), float(np.mean(offsets[:, 1])) | ||
|
||
@staticmethod | ||
def _calculate_star_offset(box: EPSFStar, image: np.ndarray) -> Optional[Tuple[float, float]]: | ||
current_boxed_image = image[box.slices] | ||
|
||
corr = signal.correlate2d(current_boxed_image, box.data, mode="same", boundary="wrap") | ||
|
||
try: | ||
return GaussianFitter.offsets_from_corr(corr) | ||
except Exception as e: | ||
log.info(f"Exception '{e}' caught. Ignoring this star.") | ||
return None | ||
|
||
|
||
__all__ = ["NStarOffsets"] |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,67 @@ | ||
import numpy as np | ||
import pytest | ||
from astropy.table import Table | ||
|
||
from pyobs.images import Image | ||
from pyobs.images.processors.misc import ImageSourceFilter | ||
|
||
|
||
def test_fits2numpy() -> None: | ||
table = Table() | ||
for k in ["x", "y", "xmin", "xmax", "ymin", "ymax", "xpeak", "ypeak"]: | ||
table[k] = [1] | ||
|
||
result = ImageSourceFilter._fits2numpy(table) | ||
assert all(map(lambda x: x == 0, result.values())) | ||
|
||
|
||
def test_remove_sources_close_to_border() -> None: | ||
table = Table({"x": [1, 10, 19, 10, 10], "y": [10, 1, 10, 19, 10]}) | ||
|
||
source_filter = ImageSourceFilter(2, 10, 10, 10) | ||
result = source_filter.remove_sources_close_to_border(table, (20, 20)) | ||
np.testing.assert_array_equal(result["x"], [10]) | ||
np.testing.assert_array_equal(result["y"], [10]) | ||
|
||
|
||
def test_remove_bad_sources() -> None: | ||
table = Table({ | ||
"peak": [50000, 5001, 5002, 5003, 5004, 5005, 5006], | ||
"tnpix": [10, 2, 100, 10, 10, 10, 10], | ||
"ellipticity": [0.3, 0.3, 0.3, 0.5, 0.3, 0.3, 0.3], | ||
"background": [100, 100, 100, 100, -1, 50000, 100] | ||
}) | ||
|
||
result = ImageSourceFilter(10, 10, 3, 0.4).remove_bad_sources(table) | ||
|
||
np.testing.assert_array_equal(result["peak"], [5006]) | ||
np.testing.assert_array_equal(result["tnpix"], [10]) | ||
np.testing.assert_array_equal(result["ellipticity"], [0.3]) | ||
np.testing.assert_array_equal(result["background"], [100]) | ||
|
||
|
||
def test_select_brightest_sources() -> None: | ||
table = Table({"flux": list(range(3))}) | ||
|
||
result = ImageSourceFilter(5.0, 2, 3, 2)._select_brightest_sources(table) | ||
np.testing.assert_array_equal(result["flux"], list(reversed(range(1, 3)))) | ||
|
||
|
||
@pytest.mark.asyncio | ||
async def test_call() -> None: | ||
sources = Table({ | ||
"x": [10], | ||
"y": [10], | ||
"flux": [10], | ||
"peak": [100], | ||
"tnpix": [10], | ||
"ellipticity": [0.0], | ||
"background": [1.0] | ||
}) | ||
|
||
image = Image(data=np.ones((20, 20)), catalog=sources) | ||
|
||
filter = ImageSourceFilter(2.0, 10, 3, 1) | ||
|
||
result = await filter(image) | ||
assert result.catalog == sources |
File renamed without changes.
Empty file.
62 changes: 62 additions & 0 deletions
62
tests/images/processors/offsets/nstar/test_box_generator.py
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,62 @@ | ||
from typing import Any | ||
|
||
import numpy as np | ||
import pytest | ||
from astropy.table import Table | ||
|
||
from pyobs.images import Image | ||
from pyobs.images.processors.offsets.nstar._box_generator import _BoxGenerator | ||
|
||
|
||
def test_check_sources_count() -> None: | ||
table = Table() | ||
with pytest.raises(ValueError): | ||
_BoxGenerator(5.0, 10)._check_sources_count(table) | ||
|
||
|
||
class MockBox: | ||
def __init__(self, center: np.ndarray[Any, Any]): | ||
self.center = center | ||
|
||
|
||
def test_check_overlapping_boxes() -> None: | ||
boxes = [MockBox(np.array([1, 1])), MockBox(np.array([4, 4])), MockBox(np.array([10, 10]))] | ||
generator = _BoxGenerator(3.0, 1) | ||
|
||
with pytest.raises(ValueError): | ||
generator._check_overlapping_boxes(boxes) | ||
|
||
|
||
def test_check_overlapping_box_pair_valid() -> None: | ||
generator = _BoxGenerator(3.0, 1) | ||
assert generator._check_overlapping_box_pair([1, 1], [10, 10]) is None | ||
|
||
|
||
def test_check_overlapping_box_pair_invalid() -> None: | ||
generator = _BoxGenerator(3.0, 1) | ||
with pytest.raises(ValueError): | ||
generator._check_overlapping_box_pair([1, 1], [10, 4]) | ||
|
||
with pytest.raises(ValueError): | ||
generator._check_overlapping_box_pair([1, 1], [4, 10]) | ||
|
||
|
||
|
||
@pytest.mark.asyncio | ||
async def test_call() -> None: | ||
sources = Table({ | ||
"x": [10], | ||
"y": [10], | ||
"flux": [10], | ||
"peak": [100], | ||
"tnpix": [10], | ||
"ellipticity": [0.0], | ||
"background": [1.0] | ||
}) | ||
|
||
image = Image(data=np.ones((20, 20)), catalog=sources) | ||
|
||
generator = _BoxGenerator(2.0, 1) | ||
|
||
result = generator(image) | ||
np.testing.assert_array_equal(result[0], np.ones((5, 5))) |
45 changes: 45 additions & 0 deletions
45
tests/images/processors/offsets/nstar/test_gaussianfitter.py
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,45 @@ | ||
from unittest.mock import Mock | ||
|
||
import numpy as np | ||
import pytest | ||
import scipy.optimize | ||
|
||
from pyobs.images.processors.offsets.nstar._gaussian_fitter import GaussianFitter | ||
|
||
|
||
@pytest.fixture() | ||
def gaussian_data(): | ||
return np.array([ | ||
[ | ||
GaussianFitter._gauss2d([x, y], 1, 2, 10, 10, 1, 1) for x in range(21) | ||
] for y in range(21) | ||
]) | ||
|
||
|
||
def test_offsets_from_corr(gaussian_data) -> None: | ||
result = GaussianFitter().offsets_from_corr(gaussian_data.astype(float)) | ||
|
||
np.testing.assert_array_almost_equal(result, (0.0, 0.0), 10) | ||
|
||
|
||
def test_offsets_from_corr_err(gaussian_data) -> None: | ||
scipy.optimize.curve_fit = Mock(side_effect=RuntimeError) | ||
|
||
result = GaussianFitter().offsets_from_corr(gaussian_data.astype(float)) | ||
assert result == (0.0, 0.0) | ||
|
||
|
||
def test_check_corr_border() -> None: | ||
corr = np.zeros((10, 10)) | ||
|
||
with pytest.raises(Exception): | ||
GaussianFitter._check_peak_border_distance(corr.astype(float), 4, 4) | ||
|
||
|
||
def test_check_fit_quality() -> None: | ||
xdata_restricted = np.array([[0.0], [0.0]]) | ||
ydata_restricted = np.array([[0.0], [0.0]]) | ||
popt = (0.0, 1.0, 0.0, 0.0, 1.0, 1.0) | ||
|
||
with pytest.raises(Exception): | ||
GaussianFitter._check_fit_quality(xdata_restricted, ydata_restricted, popt) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,107 @@ | ||
import logging | ||
from unittest.mock import Mock | ||
|
||
import numpy as np | ||
import pytest | ||
from photutils.psf import EPSFStar | ||
from pytest_mock import MockerFixture | ||
|
||
from pyobs.images import Image | ||
from pyobs.images.meta import PixelOffsets | ||
from pyobs.images.processors.offsets import NStarOffsets | ||
from pyobs.images.processors.offsets.nstar._gaussian_fitter import GaussianFitter | ||
|
||
|
||
@pytest.mark.asyncio | ||
async def test_reset() -> None: | ||
offsets = NStarOffsets() | ||
offsets.ref_boxes = [np.ones((2, 2))] | ||
await offsets.reset() | ||
|
||
assert len(offsets.ref_boxes) == 0 | ||
|
||
|
||
def test_calculate_offsets_invalid_data() -> None: | ||
image = Image() | ||
|
||
offsets = NStarOffsets() | ||
|
||
assert offsets._calculate_offsets(image) == (None, None) | ||
|
||
|
||
def test_calculate_offsets_invalid_offsets(caplog) -> None: | ||
image = Image(data=np.zeros((1, 1))) | ||
|
||
offsets = NStarOffsets() | ||
offsets.ref_boxes = [] | ||
|
||
with caplog.at_level(logging.INFO): | ||
assert offsets._calculate_offsets(image) == (None, None) | ||
|
||
assert caplog.messages[0] == "All 0 fits on boxed star correlations failed." | ||
|
||
|
||
def test_calculate_offsets() -> None: | ||
data = np.array([ | ||
[ | ||
GaussianFitter._gauss2d([x, y], 1, 2, 10, 10, 1, 1) for x in range(21) | ||
] for y in range(21) | ||
]) | ||
|
||
offsets = NStarOffsets() | ||
offsets.ref_boxes = [EPSFStar(data[7:14, 7:14], origin=(8, 8))] | ||
|
||
result = offsets._calculate_offsets(Image(data=data)) | ||
np.testing.assert_almost_equal(result, (-1.0, -1.0), 2) | ||
|
||
|
||
def test_calculate_star_offset_invalid(caplog) -> None: | ||
GaussianFitter.offsets_from_corr = Mock(side_effect=Exception("Invalid")) | ||
box = EPSFStar(np.ones((2, 2)), origin=(8, 8)) | ||
image = np.ones((3, 3)) | ||
|
||
with caplog.at_level(logging.INFO): | ||
assert NStarOffsets._calculate_star_offset(box, image) is None | ||
|
||
assert caplog.messages[0] == "Exception 'Invalid' caught. Ignoring this star." | ||
|
||
|
||
@pytest.mark.asyncio | ||
async def test_call_init() -> None: | ||
image = Image() | ||
offsets = NStarOffsets() | ||
|
||
boxes = [EPSFStar(np.ones((10, 10)), origin=(8, 8))] | ||
offsets._box_generator = Mock(return_value=boxes) | ||
|
||
result = await offsets(image) | ||
assert result.get_meta(PixelOffsets).dx == 0.0 | ||
assert result.get_meta(PixelOffsets).dy == 0.0 | ||
|
||
assert offsets.ref_boxes == boxes | ||
|
||
|
||
@pytest.mark.asyncio | ||
async def test_call_invalid_init() -> None: | ||
image = Image() | ||
image.set_meta(PixelOffsets(1.0, 1.0)) | ||
offsets = NStarOffsets() | ||
offsets._box_generator = Mock(side_effect=ValueError) | ||
|
||
result = await offsets(image) | ||
|
||
assert not result.has_meta(PixelOffsets) | ||
|
||
assert offsets.ref_boxes == [] | ||
|
||
|
||
@pytest.mark.asyncio | ||
async def test_call(mocker: MockerFixture) -> None: | ||
image = Image() | ||
offsets = NStarOffsets() | ||
offsets.ref_boxes = [EPSFStar(np.ones((10, 10)), origin=(8, 8))] | ||
mocker.patch.object(offsets, "_calculate_offsets", return_value=(1.0, -1.0)) | ||
|
||
result = await offsets(image) | ||
assert result.get_meta(PixelOffsets).dx == 1.0 | ||
assert result.get_meta(PixelOffsets).dy == -1.0 |