diff --git a/pyobs/images/processors/photometry/_photometry_calculator.py b/pyobs/images/processors/photometry/_photometry_calculator.py new file mode 100644 index 00000000..b2b6f1ff --- /dev/null +++ b/pyobs/images/processors/photometry/_photometry_calculator.py @@ -0,0 +1,23 @@ +from abc import ABCMeta, abstractmethod +from typing import List, Tuple + +from astropy.table import QTable + +from pyobs.images import Image + + +class _PhotometryCalculator(metaclass=ABCMeta): + """Abstract class for photometry calculators.""" + + @property + @abstractmethod + def catalog(self) -> QTable: + ... + + @abstractmethod + def set_data(self, image: Image): + ... + + @abstractmethod + def __call__(self, diameter: int): + ... diff --git a/pyobs/images/processors/photometry/_photutil_aperture_photometry.py b/pyobs/images/processors/photometry/_photutil_aperture_photometry.py new file mode 100644 index 00000000..3d83a2f0 --- /dev/null +++ b/pyobs/images/processors/photometry/_photutil_aperture_photometry.py @@ -0,0 +1,88 @@ +from typing import List, Tuple, Optional + +import numpy as np +from astropy.stats import sigma_clipped_stats +from astropy.table import QTable +from photutils.aperture import CircularAperture, CircularAnnulus, ApertureMask, aperture_photometry + +from pyobs.images import Image +from pyobs.images.processors.photometry._photometry_calculator import _PhotometryCalculator + + +class _PhotUtilAperturePhotometry(_PhotometryCalculator): + + def __init__(self): + self._image: Optional[Image] = None + self._positions: Optional[List[Tuple[float, float]]] = None + + def set_data(self, image: Image): + self._image = image.copy() + self._positions = [(x - 1, y - 1) for x, y in image.catalog.iterrows("x", "y")] + + @property + def catalog(self): + return self._image.catalog + + def __call__(self, diameter: int): + radius = self._calc_aperture_radius_in_px(diameter) + if radius < 1: + return + + aperture = CircularAperture(self._positions, r=radius) + aperture_flux, aperture_error = self._calc_aperture_flux(aperture) + + median_background = self._calc_median_backgrounds(radius) + aperture_background = self._calc_integrated_background(median_background, aperture) + + corrected_aperture = self._background_correct_aperture_flux(aperture_flux, aperture_background) + + self._update_catalog(diameter, corrected_aperture, aperture_error, median_background) + + def _calc_aperture_radius_in_px(self, diameter: int): + radius = diameter / 2.0 + return radius / self._image.pixel_scale + + def _calc_median_backgrounds(self, radius: float) -> np.ndarray[float]: + annulus_aperture = CircularAnnulus(self._positions, r_in=2 * radius, r_out=3 * radius) + annulus_masks = annulus_aperture.to_mask(method="center") + + bkg_median = [ + self._calc_median_background(mask) + for mask in annulus_masks + ] + + return np.array(bkg_median) + + def _calc_median_background(self, mask: ApertureMask) -> float: + annulus_data = mask.multiply(self._image.data) + annulus_data_1d = annulus_data[mask.data > 0] + _, sigma_clipped_median, _ = sigma_clipped_stats(annulus_data_1d) + return sigma_clipped_median + + @staticmethod + def _calc_integrated_background(median_background: np.ndarray[float], aperture: CircularAperture) -> np.ndarray[float]: + return median_background * aperture.area + + def _calc_aperture_flux(self, aperture: CircularAperture) -> Tuple[ + np.ndarray[float], Optional[np.ndarray[float]]]: + + phot: QTable = aperture_photometry(self._image.data, aperture, mask=self._image.safe_mask, + error=self._image.safe_uncertainty) + + aperture_flux = phot["aperture_sum"] + aperture_error = phot["aperture_sum_err"] if "aperture_sum_err" in phot.keys() else None + + return aperture_flux, aperture_error + + @staticmethod + def _background_correct_aperture_flux(aperture_flux: np.ndarray[float], aperture_background: np.ndarray[float]) -> \ + np.ndarray[float]: + return aperture_flux - aperture_background + + def _update_catalog(self, diameter: int, corrected_aperture_flux: np.ndarray[float], + aperture_error: Optional[np.ndarray[float]], median_background: np.ndarray[float]): + + self._image.catalog["fluxaper%d" % diameter] = corrected_aperture_flux + if aperture_error is not None: + self._image.catalog["fluxerr%d" % diameter] = aperture_error + self._image.catalog["bkgaper%d" % diameter] = median_background diff --git a/pyobs/images/processors/photometry/_sep_aperture_photometry.py b/pyobs/images/processors/photometry/_sep_aperture_photometry.py new file mode 100644 index 00000000..f5dbb0a9 --- /dev/null +++ b/pyobs/images/processors/photometry/_sep_aperture_photometry.py @@ -0,0 +1,84 @@ +from typing import List, Optional, Any + +import numpy as np +from astropy.table import Table + +from pyobs.images import Image +from pyobs.images.processors.detection import SepSourceDetection +from pyobs.images.processors.photometry._photometry_calculator import _PhotometryCalculator + + +class _SepAperturePhotometry(_PhotometryCalculator): + def __init__(self) -> None: + self._image: Optional[Image] = None + self._pos_x: Optional[List[float]] = None + self._pos_y: Optional[List[float]] = None + + self._gain: Optional[float] = None + + self._data: Optional[np.ndarray[int, Any]] = None + self._average_background: Optional[np.ndarray[int, float]] = None + + def set_data(self, image: Image) -> None: + self._image = image.copy() + self._pos_x, self._pos_y = self._image.catalog["x"], self._image.catalog["y"] + self._gain = image.header["DET-GAIN"] if "DET-GAIN" in image.header else None + + @property + def catalog(self) -> Table: + return self._image.catalog + + def _update_background_header(self) -> None: + self._image.catalog[f"background"] = self._average_background + + def __call__(self, diameter: int) -> None: + import sep + + if self._is_background_calculated(): + self._calc_background() + self._update_background_header() + + radius = self._calc_aperture_radius_in_px(diameter) + flux, fluxerr, _ = sep.sum_circle( + self._data, self._pos_x, self._pos_y, radius, mask=self._image.safe_mask, err=self._image.safe_uncertainty, gain=self._gain + ) + self._update_flux_header(diameter, flux, fluxerr) + + def _is_background_calculated(self) -> None: + return self._data is None + + def _calc_background(self) -> None: + self._data, bkg = SepSourceDetection.remove_background(self._image.data, self._image.safe_mask) + self._average_background = self._calc_average_background(bkg.back()) + + def _calc_average_background(self, background: np.ndarray) -> \ + np.ndarray[float]: + """ + since SEP sums up whole pixels, we need to do the same on an image of ones for the background_area + """ + background_flux = self._sum_ellipse(background, self._image, self._pos_x, self._pos_y) + background_area = self._sum_ellipse(np.ones(shape=background.shape), self._image, self._pos_x, self._pos_y) + + median_background = np.divide(background_flux, background_area, where=background_area != 0) + return median_background + + @staticmethod + def _sum_ellipse(data: np.ndarray[float], image: Image, x: np.ndarray[float], y: np.ndarray[float]) -> np.ndarray[float]: + import sep + sum, _, _ = sep.sum_ellipse( + data, x, y, + image.catalog["a"], + image.catalog["b"], + theta=np.pi / 2.0, + r=2.5 * image.catalog["kronrad"], + subpix=1 + ) + return sum + + def _calc_aperture_radius_in_px(self, diameter: int) -> float: + radius = diameter / 2.0 + return radius / self._image.pixel_scale + + def _update_flux_header(self, diameter: int, flux: np.ndarray, fluxerr: np.ndarray[float]) -> None: + self._image.catalog[f"fluxaper{diameter}"] = flux + self._image.catalog[f"fluxerr{diameter}"] = fluxerr \ No newline at end of file diff --git a/pyobs/images/processors/photometry/aperture_photometry.py b/pyobs/images/processors/photometry/aperture_photometry.py new file mode 100644 index 00000000..8b53437e --- /dev/null +++ b/pyobs/images/processors/photometry/aperture_photometry.py @@ -0,0 +1,52 @@ +import asyncio +import logging +from typing import Any + +from pyobs.images import Image +from pyobs.images.processors.photometry import Photometry +from pyobs.images.processors.photometry._photometry_calculator import _PhotometryCalculator + +log = logging.getLogger(__name__) + + +class AperturePhotometry(Photometry): + APERTURE_RADII = range(1, 9) + + def __init__(self, calculator: _PhotometryCalculator, **kwargs: Any) -> None: + self._calculator = calculator + super().__init__(**kwargs) + + async def __call__(self, image: Image) -> Image: + """Do aperture photometry on given image. + + Args: + image: Image to do aperture photometry on. + + Returns: + Image with attached catalog. + """ + + if image.safe_data is None: + log.warning("No data found in image.") + return image + + if image.pixel_scale is None: + log.warning("No pixel scale provided by image.") + return image + + if image.safe_catalog is None: + log.warning("No catalog found in image.") + return image + + self._calculator.set_data(image) + + loop = asyncio.get_running_loop() + await loop.run_in_executor(None, self._photometry) + + output_image = image.copy() + output_image.catalog = self._calculator.catalog + return output_image + + def _photometry(self) -> None: + for diameter in self.APERTURE_RADII: + self._calculator(diameter) diff --git a/pyobs/images/processors/photometry/photometry.py b/pyobs/images/processors/photometry/photometry.py index ca63236b..5ec30349 100644 --- a/pyobs/images/processors/photometry/photometry.py +++ b/pyobs/images/processors/photometry/photometry.py @@ -19,7 +19,6 @@ async def __call__(self, image: Image) -> Image: Returns: Image with attached catalog. """ - ... __all__ = ["Photometry"] diff --git a/pyobs/images/processors/photometry/photutil.py b/pyobs/images/processors/photometry/photutil.py index 4356a07b..0d3e61b5 100644 --- a/pyobs/images/processors/photometry/photutil.py +++ b/pyobs/images/processors/photometry/photutil.py @@ -1,117 +1,19 @@ -import asyncio -from functools import partial -from typing import Any -from astropy.stats import sigma_clipped_stats import logging -import numpy as np -from photutils import CircularAnnulus, CircularAperture, aperture_photometry +from typing import Any -from .photometry import Photometry -from pyobs.images import Image +from ._photutil_aperture_photometry import _PhotUtilAperturePhotometry +from .aperture_photometry import AperturePhotometry log = logging.getLogger(__name__) -class PhotUtilsPhotometry(Photometry): +class PhotUtilsPhotometry(AperturePhotometry): """Perform photometry using PhotUtils.""" __module__ = "pyobs.images.processors.photometry" - def __init__( - self, - threshold: float = 1.5, - minarea: int = 5, - deblend_nthresh: int = 32, - deblend_cont: float = 0.005, - clean: bool = True, - clean_param: float = 1.0, - **kwargs: Any, - ): - """Initializes an aperture photometry based on PhotUtils. - - Args: - threshold: Threshold pixel value for detection. - minarea: Minimum number of pixels required for detection. - deblend_nthresh: Number of thresholds used for object deblending. - deblend_cont: Minimum contrast ratio used for object deblending. - clean: Perform cleaning? - clean_param: Cleaning parameter (see SExtractor manual). - *args: - **kwargs: - """ - Photometry.__init__(self, **kwargs) - - # store - self.threshold = threshold - self.minarea = minarea - self.deblend_nthresh = deblend_nthresh - self.deblend_cont = deblend_cont - self.clean = clean - self.clean_param = clean_param - - async def __call__(self, image: Image) -> Image: - """Do aperture photometry on given image. - - Args: - image: Image to do aperture photometry on. - - Returns: - Image with attached catalog. - """ - loop = asyncio.get_running_loop() - - # no pixel scale given? - if image.pixel_scale is None: - log.warning("No pixel scale provided by image.") - return image - - # fetch catalog - if image.safe_catalog is None: - log.warning("No catalog in image.") - return image - sources = image.catalog.copy() - - # get positions - positions = [(x - 1, y - 1) for x, y in sources.iterrows("x", "y")] - - # perform aperture photometry for diameters of 1" to 8" - for diameter in [1, 2, 3, 4, 5, 6, 7, 8]: - # extraction radius in pixels - radius = diameter / 2.0 / image.pixel_scale - if radius < 1: - continue - - # defines apertures - aperture = CircularAperture(positions, r=radius) - annulus_aperture = CircularAnnulus(positions, r_in=2 * radius, r_out=3 * radius) - annulus_masks = annulus_aperture.to_mask(method="center") - - # loop annuli - bkg_median = [] - for m in annulus_masks: - annulus_data = m.multiply(image.data) - annulus_data_1d = annulus_data[m.data > 0] - _, median_sigclip, _ = sigma_clipped_stats(annulus_data_1d) - bkg_median.append(median_sigclip) - - # do photometry - phot = await loop.run_in_executor( - None, - partial(aperture_photometry, image.data, aperture, mask=image.safe_mask, error=image.safe_uncertainty), - ) - - # calc flux - bkg_median_np = np.array(bkg_median) - aper_bkg = bkg_median_np * aperture.area - sources["fluxaper%d" % diameter] = phot["aperture_sum"] - aper_bkg - if "aperture_sum_err" in phot.columns: - sources["fluxerr%d" % diameter] = phot["aperture_sum_err"] - sources["bkgaper%d" % diameter] = bkg_median_np - - # copy image, set catalog and return it - img = image.copy() - img.catalog = sources - return img + def __init__(self, **kwargs: Any) -> None: + super().__init__(_PhotUtilAperturePhotometry(), **kwargs) __all__ = ["PhotUtilsPhotometry"] diff --git a/pyobs/images/processors/photometry/pysep.py b/pyobs/images/processors/photometry/pysep.py index bb765079..d0d87a36 100644 --- a/pyobs/images/processors/photometry/pysep.py +++ b/pyobs/images/processors/photometry/pysep.py @@ -1,154 +1,19 @@ -import asyncio -from typing import Any import logging -import numpy as np +from typing import Any -from .photometry import Photometry -from pyobs.images import Image +from ._sep_aperture_photometry import _SepAperturePhotometry +from .aperture_photometry import AperturePhotometry log = logging.getLogger(__name__) -class SepPhotometry(Photometry): +class SepPhotometry(AperturePhotometry): """Perform photometry using SEP.""" __module__ = "pyobs.images.processors.photometry" - def __init__( - self, - threshold: float = 1.5, - minarea: int = 5, - deblend_nthresh: int = 32, - deblend_cont: float = 0.005, - clean: bool = True, - clean_param: float = 1.0, - **kwargs: Any, - ): - """Initializes a wrapper for SEP. See its documentation for details. - - Highly inspired by LCO's wrapper for SEP, see: - https://github.com/LCOGT/banzai/blob/master/banzai/photometry.py - - Args: - threshold: Threshold pixel value for detection. - minarea: Minimum number of pixels required for detection. - deblend_nthresh: Number of thresholds used for object deblending. - deblend_cont: Minimum contrast ratio used for object deblending. - clean: Perform cleaning? - clean_param: Cleaning parameter (see SExtractor manual). - *args: - **kwargs: - """ - Photometry.__init__(self, **kwargs) - - # store - self.threshold = threshold - self.minarea = minarea - self.deblend_nthresh = deblend_nthresh - self.deblend_cont = deblend_cont - self.clean = clean - self.clean_param = clean_param - - async def __call__(self, image: Image) -> Image: - """Do aperture photometry on given image. - - Args: - image: Image to do aperture photometry on. - - Returns: - Image with attached catalog. - """ - loop = asyncio.get_running_loop() - return await loop.run_in_executor(None, SepPhotometry._photometry, image) - - @staticmethod - def _photometry(image: Image) -> Image: - import sep - from pyobs.images.processors.detection import SepSourceDetection - - # check data - if image.safe_data is None: - log.warning("No data found in image.") - return image - if image.safe_catalog is None: - log.warning("No catalog found in image.") - return image - - # remove background - data, bkg = SepSourceDetection.remove_background(image.data, image.safe_mask) - - # fetch catalog - sources = image.catalog.copy() - - # match SEP conventions - x, y = sources["x"] - 1, sources["y"] - 1 - - # get gain - gain = image.header["DET-GAIN"] if "DET-GAIN" in image.header else None - - # perform aperture photometry for diameters of 1" to 8" - for diameter in [1, 2, 3, 4, 5, 6, 7, 8]: - if image.pixel_scale is not None: - flux, fluxerr, flag = sep.sum_circle( - data, - x, - y, - diameter / 2.0 / image.pixel_scale, - mask=image.safe_mask, - err=image.safe_uncertainty, - gain=gain, - ) - sources["fluxaper{0}".format(diameter)] = flux - sources["fluxerr{0}".format(diameter)] = fluxerr - - else: - sources["fluxaper{0}".format(diameter)] = 0 - sources["fluxerr{0}".format(diameter)] = 0 - - # average background at each source - # since SEP sums up whole pixels, we need to do the same on an image of ones for the background_area - bkgflux, fluxerr, flag = sep.sum_ellipse( - bkg.back(), x, y, sources["a"], sources["b"], np.pi / 2.0, 2.5 * sources["kronrad"], subpix=1 - ) - background_area, _, _ = sep.sum_ellipse( - np.ones(shape=bkg.back().shape), - x, - y, - sources["a"], - sources["b"], - np.pi / 2.0, - 2.5 * sources["kronrad"], - subpix=1, - ) - sources["background"] = bkgflux - sources["background"][background_area > 0] /= background_area[background_area > 0] - - # pick columns for catalog - new_columns = [ - "fluxaper1", - "fluxerr1", - "fluxaper2", - "fluxerr2", - "fluxaper3", - "fluxerr3", - "fluxaper4", - "fluxerr4", - "fluxaper5", - "fluxerr5", - "fluxaper6", - "fluxerr6", - "fluxaper7", - "fluxerr7", - "fluxaper8", - "fluxerr8", - "background", - ] - cat = sources[image.catalog.colnames + new_columns] - - # copy image, set catalog and return it - img = image.copy() - img.catalog = cat - return img + def __init__(self, **kwargs: Any) -> None: + super().__init__(_SepAperturePhotometry(), **kwargs) __all__ = ["SepPhotometry"] diff --git a/tests/images/processors/detection/test_pysep.py b/tests/images/processors/detection/test_pysep.py index 231c59f2..2cc8c6ac 100644 --- a/tests/images/processors/detection/test_pysep.py +++ b/tests/images/processors/detection/test_pysep.py @@ -40,6 +40,13 @@ def test_init(): assert detector.clean_param == clean_param +@pytest.mark.asyncio +async def test_remove_const_background(): + data, bkg = SepSourceDetection.remove_background(data=np.ones((100, 100))) + np.testing.assert_array_almost_equal(data, np.zeros((100, 100)), 13) + np.testing.assert_array_almost_equal(bkg.back(), np.ones((100, 100)), 13) + + @pytest.mark.asyncio async def test_full(gaussian_sources_image): detector = SepSourceDetection() diff --git a/tests/images/processors/photometry/__init__.py b/tests/images/processors/photometry/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/tests/images/processors/photometry/conftest.py b/tests/images/processors/photometry/conftest.py new file mode 100644 index 00000000..61f76c48 --- /dev/null +++ b/tests/images/processors/photometry/conftest.py @@ -0,0 +1,14 @@ +import numpy as np +import pytest +from astropy.io.fits import Header +from astropy.table import QTable + +from pyobs.images import Image + + +@pytest.fixture(scope="module") +def const_test_image() -> Image: + data = np.ones((100, 100)) + header = Header({"CD1_1": 1.0}) + catalog = QTable({"x": [40.0], "y": [40.0]}) + return Image(data=data, header=header, catalog=catalog) diff --git a/tests/images/processors/photometry/test_aperture_photometry.py b/tests/images/processors/photometry/test_aperture_photometry.py new file mode 100644 index 00000000..7454650f --- /dev/null +++ b/tests/images/processors/photometry/test_aperture_photometry.py @@ -0,0 +1,74 @@ +import logging +from typing import List, Tuple + +import numpy as np +import pytest +from astropy.io.fits import Header +from astropy.table import QTable + +from pyobs.images import Image +from pyobs.images.processors.photometry._photometry_calculator import _PhotometryCalculator +from pyobs.images.processors.photometry.aperture_photometry import AperturePhotometry + + +class MockPhotometryCalculator(_PhotometryCalculator): + def __init__(self): + self._catalog = None + + @property + def catalog(self) -> QTable: + return self._catalog + + def set_data(self, image: Image): + self._catalog = image.catalog.copy() + + def __call__(self, diameter: int): + self._catalog[f"call{diameter}"] = 1 + + +@pytest.mark.asyncio +async def test_call_invalid_data(caplog): + image = Image() + photometry = AperturePhotometry(MockPhotometryCalculator()) + + with caplog.at_level(logging.WARNING): + result = await photometry(image) + + assert caplog.records[0].message == "No data found in image." + assert result == image + + +@pytest.mark.asyncio +async def test_call_invalid_pixelscale(caplog): + image = Image(data=np.zeros((1, 1))) + photometry = AperturePhotometry(MockPhotometryCalculator()) + + with caplog.at_level(logging.WARNING): + result = await photometry(image) + + assert caplog.records[0].message == "No pixel scale provided by image." + assert result == image + + +@pytest.mark.asyncio +async def test_call_invalid_catalog(caplog): + image = Image(data=np.zeros((1, 1)), header=Header({"CD1_1": 1.0})) + photometry = AperturePhotometry(MockPhotometryCalculator()) + + with caplog.at_level(logging.WARNING): + result = await photometry(image) + + assert caplog.records[0].message == "No catalog found in image." + assert result == image + + +@pytest.mark.asyncio +async def test_call_valid(const_test_image): + calculator = MockPhotometryCalculator() + photometry = AperturePhotometry(calculator) + result = await photometry(const_test_image) + + assert all([f"call{x}" in result.catalog.keys() for x in AperturePhotometry.APERTURE_RADII]) + + np.testing.assert_array_equal(const_test_image.data, result.data) + assert const_test_image.catalog is not result.catalog diff --git a/tests/images/processors/photometry/test_photutil.py b/tests/images/processors/photometry/test_photutil.py new file mode 100644 index 00000000..c0d5c487 --- /dev/null +++ b/tests/images/processors/photometry/test_photutil.py @@ -0,0 +1,7 @@ +from pyobs.images.processors.photometry import PhotUtilsPhotometry +from pyobs.images.processors.photometry._photutil_aperture_photometry import _PhotUtilAperturePhotometry + + +def test_init(): + photometry = PhotUtilsPhotometry() + assert isinstance(photometry._calculator, _PhotUtilAperturePhotometry) diff --git a/tests/images/processors/photometry/test_photutils_aperature_photometry.py b/tests/images/processors/photometry/test_photutils_aperature_photometry.py new file mode 100644 index 00000000..17dfdde8 --- /dev/null +++ b/tests/images/processors/photometry/test_photutils_aperature_photometry.py @@ -0,0 +1,40 @@ +import numpy as np +import pytest + +from pyobs.images.processors.photometry._photutil_aperture_photometry import _PhotUtilAperturePhotometry + + +@pytest.mark.asyncio +async def test_call_to_large_pixel_scale(caplog, const_test_image): + photometry = _PhotUtilAperturePhotometry() + photometry.set_data(const_test_image) + + photometry(1) + + assert photometry.catalog.keys() == ["x", "y"] + assert photometry.catalog["x"] == [40] + assert photometry.catalog["y"] == [40] + + +@pytest.mark.asyncio +async def test_call_const(const_test_image): + const_test_image.header["CD1_1"] = 1 / (2.5 * 3600) + photometry = _PhotUtilAperturePhotometry() + photometry.set_data(const_test_image) + photometry(1) + + # Test background is 1.0 + np.testing.assert_almost_equal(photometry.catalog[f"bkgaper1"][0], 1.0, 14) + + # Test flux is 0.0 + np.testing.assert_almost_equal(photometry.catalog[f"fluxaper1"][0], 0.0, 13) + + +@pytest.mark.asyncio +async def test_update_header_flux_error(const_test_image): + photometry = _PhotUtilAperturePhotometry() + photometry.set_data(const_test_image) + + test_array = np.array([1.0]) + photometry._update_catalog(1, test_array, test_array, test_array) + np.testing.assert_array_equal(photometry.catalog["fluxerr1"], test_array) \ No newline at end of file diff --git a/tests/images/processors/photometry/test_pysep.py b/tests/images/processors/photometry/test_pysep.py new file mode 100644 index 00000000..2898c07c --- /dev/null +++ b/tests/images/processors/photometry/test_pysep.py @@ -0,0 +1,7 @@ +from pyobs.images.processors.photometry import SepPhotometry +from pyobs.images.processors.photometry._sep_aperture_photometry import _SepAperturePhotometry + + +def test_init(): + photometry = SepPhotometry() + assert isinstance(photometry._calculator, _SepAperturePhotometry) diff --git a/tests/images/processors/photometry/test_pysep_aperture_photometry.py b/tests/images/processors/photometry/test_pysep_aperture_photometry.py new file mode 100644 index 00000000..3ac44136 --- /dev/null +++ b/tests/images/processors/photometry/test_pysep_aperture_photometry.py @@ -0,0 +1,49 @@ +import numpy as np +import pytest +from astropy.table import QTable + +from pyobs.images import Image +from pyobs.images.processors.photometry._sep_aperture_photometry import _SepAperturePhotometry + + +@pytest.fixture() +def test_catalog(): + catalog = QTable({"x": [40], "y": [40], "a": [10], "b": [5], "kronrad": [5]}) + return catalog + + +@pytest.mark.asyncio +async def test_call_const(test_catalog): + const_test_image = Image(data=np.ones((100, 100)), catalog=test_catalog) + photometry = _SepAperturePhotometry() + const_test_image.header["CD1_1"] = 1/(2.5*3600) + + photometry.set_data(const_test_image) + photometry(5) + + # Test background is 1.0 + np.testing.assert_almost_equal(photometry.catalog["background"][0], 1.0, 14) + + # Test flux is 0.0 + assert abs(photometry.catalog[f"fluxaper5"][0] - 0.0) < 1e-13 + assert abs(photometry.catalog[f"fluxerr5"][0] - 0.0) < 1e-13 + + +@pytest.mark.asyncio +async def test_call_single_peak(test_catalog): + data = np.zeros((100, 100)) + data[39][39] = 100 + const_test_image = Image(data=data, catalog=test_catalog) + photometry = _SepAperturePhotometry() + const_test_image.header["CD1_1"] = 1/(2.5*3600) + photometry.set_data(const_test_image) + photometry(5) + + # Test background is 0.0 + np.testing.assert_almost_equal(photometry.catalog["background"][0], 0.0, 14) + + # Test flux is 100.0 + assert abs(photometry.catalog[f"fluxaper5"][0] - 100.0) < 1e-13 + assert abs(photometry.catalog[f"fluxerr5"][0] - 0.0) < 1e-13 + +