Skip to content

Commit

Permalink
Test photometry image processor modules (#314)
Browse files Browse the repository at this point in the history
* Added basic unit tests for photutil photometry

* Refactored photutil photomertry

* Extracted static functions from photutils photometry into its own class

* Added additional testcases for better coverage

* Added unit tests to sep photometry

* Improved sep photometry tests to verify mask behavior

* Refactored sep photometry

* Fixed background catalog entry when the pixel scale is missing

* Adapted handling of missing data between the photometry modules

* Adapted run_in_executor calls between photometry modules

* Extracted duplicate code from photometry methods to own class

* Added tests to aperture photometry

* Fixed reference check in aperture photometry test

* Removed redundant photometry tests

* Pulled getting the position from the catalog into the photometry calculators

* Changed to safe getter

* Fixed some typing errors
  • Loading branch information
GermanHydrogen authored Feb 14, 2024
1 parent ed72678 commit 9570a32
Show file tree
Hide file tree
Showing 15 changed files with 457 additions and 246 deletions.
23 changes: 23 additions & 0 deletions pyobs/images/processors/photometry/_photometry_calculator.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
from abc import ABCMeta, abstractmethod
from typing import List, Tuple

from astropy.table import QTable

from pyobs.images import Image


class _PhotometryCalculator(metaclass=ABCMeta):
"""Abstract class for photometry calculators."""

@property
@abstractmethod
def catalog(self) -> QTable:
...

@abstractmethod
def set_data(self, image: Image):
...

@abstractmethod
def __call__(self, diameter: int):
...
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
from typing import List, Tuple, Optional

import numpy as np
from astropy.stats import sigma_clipped_stats
from astropy.table import QTable
from photutils.aperture import CircularAperture, CircularAnnulus, ApertureMask, aperture_photometry

from pyobs.images import Image
from pyobs.images.processors.photometry._photometry_calculator import _PhotometryCalculator


class _PhotUtilAperturePhotometry(_PhotometryCalculator):

def __init__(self):
self._image: Optional[Image] = None
self._positions: Optional[List[Tuple[float, float]]] = None

def set_data(self, image: Image):
self._image = image.copy()
self._positions = [(x - 1, y - 1) for x, y in image.catalog.iterrows("x", "y")]

@property
def catalog(self):
return self._image.catalog

def __call__(self, diameter: int):
radius = self._calc_aperture_radius_in_px(diameter)
if radius < 1:
return

aperture = CircularAperture(self._positions, r=radius)
aperture_flux, aperture_error = self._calc_aperture_flux(aperture)

median_background = self._calc_median_backgrounds(radius)
aperture_background = self._calc_integrated_background(median_background, aperture)

corrected_aperture = self._background_correct_aperture_flux(aperture_flux, aperture_background)

self._update_catalog(diameter, corrected_aperture, aperture_error, median_background)

def _calc_aperture_radius_in_px(self, diameter: int):
radius = diameter / 2.0
return radius / self._image.pixel_scale

def _calc_median_backgrounds(self, radius: float) -> np.ndarray[float]:
annulus_aperture = CircularAnnulus(self._positions, r_in=2 * radius, r_out=3 * radius)
annulus_masks = annulus_aperture.to_mask(method="center")

bkg_median = [
self._calc_median_background(mask)
for mask in annulus_masks
]

return np.array(bkg_median)

def _calc_median_background(self, mask: ApertureMask) -> float:
annulus_data = mask.multiply(self._image.data)
annulus_data_1d = annulus_data[mask.data > 0]
_, sigma_clipped_median, _ = sigma_clipped_stats(annulus_data_1d)
return sigma_clipped_median

@staticmethod
def _calc_integrated_background(median_background: np.ndarray[float], aperture: CircularAperture) -> np.ndarray[float]:
return median_background * aperture.area

def _calc_aperture_flux(self, aperture: CircularAperture) -> Tuple[
np.ndarray[float], Optional[np.ndarray[float]]]:

phot: QTable = aperture_photometry(self._image.data, aperture, mask=self._image.safe_mask,
error=self._image.safe_uncertainty)

aperture_flux = phot["aperture_sum"]
aperture_error = phot["aperture_sum_err"] if "aperture_sum_err" in phot.keys() else None

return aperture_flux, aperture_error

@staticmethod
def _background_correct_aperture_flux(aperture_flux: np.ndarray[float], aperture_background: np.ndarray[float]) -> \
np.ndarray[float]:
return aperture_flux - aperture_background

def _update_catalog(self, diameter: int, corrected_aperture_flux: np.ndarray[float],
aperture_error: Optional[np.ndarray[float]], median_background: np.ndarray[float]):

self._image.catalog["fluxaper%d" % diameter] = corrected_aperture_flux
if aperture_error is not None:
self._image.catalog["fluxerr%d" % diameter] = aperture_error
self._image.catalog["bkgaper%d" % diameter] = median_background
84 changes: 84 additions & 0 deletions pyobs/images/processors/photometry/_sep_aperture_photometry.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
from typing import List, Optional, Any

import numpy as np
from astropy.table import Table

from pyobs.images import Image
from pyobs.images.processors.detection import SepSourceDetection
from pyobs.images.processors.photometry._photometry_calculator import _PhotometryCalculator


class _SepAperturePhotometry(_PhotometryCalculator):
def __init__(self) -> None:
self._image: Optional[Image] = None
self._pos_x: Optional[List[float]] = None
self._pos_y: Optional[List[float]] = None

self._gain: Optional[float] = None

self._data: Optional[np.ndarray[int, Any]] = None
self._average_background: Optional[np.ndarray[int, float]] = None

def set_data(self, image: Image) -> None:
self._image = image.copy()
self._pos_x, self._pos_y = self._image.catalog["x"], self._image.catalog["y"]
self._gain = image.header["DET-GAIN"] if "DET-GAIN" in image.header else None

@property
def catalog(self) -> Table:
return self._image.catalog

def _update_background_header(self) -> None:
self._image.catalog[f"background"] = self._average_background

def __call__(self, diameter: int) -> None:
import sep

if self._is_background_calculated():
self._calc_background()
self._update_background_header()

radius = self._calc_aperture_radius_in_px(diameter)
flux, fluxerr, _ = sep.sum_circle(
self._data, self._pos_x, self._pos_y, radius, mask=self._image.safe_mask, err=self._image.safe_uncertainty, gain=self._gain
)
self._update_flux_header(diameter, flux, fluxerr)

def _is_background_calculated(self) -> None:
return self._data is None

def _calc_background(self) -> None:
self._data, bkg = SepSourceDetection.remove_background(self._image.data, self._image.safe_mask)
self._average_background = self._calc_average_background(bkg.back())

def _calc_average_background(self, background: np.ndarray) -> \
np.ndarray[float]:
"""
since SEP sums up whole pixels, we need to do the same on an image of ones for the background_area
"""
background_flux = self._sum_ellipse(background, self._image, self._pos_x, self._pos_y)
background_area = self._sum_ellipse(np.ones(shape=background.shape), self._image, self._pos_x, self._pos_y)

median_background = np.divide(background_flux, background_area, where=background_area != 0)
return median_background

@staticmethod
def _sum_ellipse(data: np.ndarray[float], image: Image, x: np.ndarray[float], y: np.ndarray[float]) -> np.ndarray[float]:
import sep
sum, _, _ = sep.sum_ellipse(
data, x, y,
image.catalog["a"],
image.catalog["b"],
theta=np.pi / 2.0,
r=2.5 * image.catalog["kronrad"],
subpix=1
)
return sum

def _calc_aperture_radius_in_px(self, diameter: int) -> float:
radius = diameter / 2.0
return radius / self._image.pixel_scale

def _update_flux_header(self, diameter: int, flux: np.ndarray, fluxerr: np.ndarray[float]) -> None:
self._image.catalog[f"fluxaper{diameter}"] = flux
self._image.catalog[f"fluxerr{diameter}"] = fluxerr
52 changes: 52 additions & 0 deletions pyobs/images/processors/photometry/aperture_photometry.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
import asyncio
import logging
from typing import Any

from pyobs.images import Image
from pyobs.images.processors.photometry import Photometry
from pyobs.images.processors.photometry._photometry_calculator import _PhotometryCalculator

log = logging.getLogger(__name__)


class AperturePhotometry(Photometry):
APERTURE_RADII = range(1, 9)

def __init__(self, calculator: _PhotometryCalculator, **kwargs: Any) -> None:
self._calculator = calculator
super().__init__(**kwargs)

async def __call__(self, image: Image) -> Image:
"""Do aperture photometry on given image.
Args:
image: Image to do aperture photometry on.
Returns:
Image with attached catalog.
"""

if image.safe_data is None:
log.warning("No data found in image.")
return image

if image.pixel_scale is None:
log.warning("No pixel scale provided by image.")
return image

if image.safe_catalog is None:
log.warning("No catalog found in image.")
return image

self._calculator.set_data(image)

loop = asyncio.get_running_loop()
await loop.run_in_executor(None, self._photometry)

output_image = image.copy()
output_image.catalog = self._calculator.catalog
return output_image

def _photometry(self) -> None:
for diameter in self.APERTURE_RADII:
self._calculator(diameter)
1 change: 0 additions & 1 deletion pyobs/images/processors/photometry/photometry.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ async def __call__(self, image: Image) -> Image:
Returns:
Image with attached catalog.
"""
...


__all__ = ["Photometry"]
110 changes: 6 additions & 104 deletions pyobs/images/processors/photometry/photutil.py
Original file line number Diff line number Diff line change
@@ -1,117 +1,19 @@
import asyncio
from functools import partial
from typing import Any
from astropy.stats import sigma_clipped_stats
import logging
import numpy as np
from photutils import CircularAnnulus, CircularAperture, aperture_photometry
from typing import Any

from .photometry import Photometry
from pyobs.images import Image
from ._photutil_aperture_photometry import _PhotUtilAperturePhotometry
from .aperture_photometry import AperturePhotometry

log = logging.getLogger(__name__)


class PhotUtilsPhotometry(Photometry):
class PhotUtilsPhotometry(AperturePhotometry):
"""Perform photometry using PhotUtils."""

__module__ = "pyobs.images.processors.photometry"

def __init__(
self,
threshold: float = 1.5,
minarea: int = 5,
deblend_nthresh: int = 32,
deblend_cont: float = 0.005,
clean: bool = True,
clean_param: float = 1.0,
**kwargs: Any,
):
"""Initializes an aperture photometry based on PhotUtils.
Args:
threshold: Threshold pixel value for detection.
minarea: Minimum number of pixels required for detection.
deblend_nthresh: Number of thresholds used for object deblending.
deblend_cont: Minimum contrast ratio used for object deblending.
clean: Perform cleaning?
clean_param: Cleaning parameter (see SExtractor manual).
*args:
**kwargs:
"""
Photometry.__init__(self, **kwargs)

# store
self.threshold = threshold
self.minarea = minarea
self.deblend_nthresh = deblend_nthresh
self.deblend_cont = deblend_cont
self.clean = clean
self.clean_param = clean_param

async def __call__(self, image: Image) -> Image:
"""Do aperture photometry on given image.
Args:
image: Image to do aperture photometry on.
Returns:
Image with attached catalog.
"""
loop = asyncio.get_running_loop()

# no pixel scale given?
if image.pixel_scale is None:
log.warning("No pixel scale provided by image.")
return image

# fetch catalog
if image.safe_catalog is None:
log.warning("No catalog in image.")
return image
sources = image.catalog.copy()

# get positions
positions = [(x - 1, y - 1) for x, y in sources.iterrows("x", "y")]

# perform aperture photometry for diameters of 1" to 8"
for diameter in [1, 2, 3, 4, 5, 6, 7, 8]:
# extraction radius in pixels
radius = diameter / 2.0 / image.pixel_scale
if radius < 1:
continue

# defines apertures
aperture = CircularAperture(positions, r=radius)
annulus_aperture = CircularAnnulus(positions, r_in=2 * radius, r_out=3 * radius)
annulus_masks = annulus_aperture.to_mask(method="center")

# loop annuli
bkg_median = []
for m in annulus_masks:
annulus_data = m.multiply(image.data)
annulus_data_1d = annulus_data[m.data > 0]
_, median_sigclip, _ = sigma_clipped_stats(annulus_data_1d)
bkg_median.append(median_sigclip)

# do photometry
phot = await loop.run_in_executor(
None,
partial(aperture_photometry, image.data, aperture, mask=image.safe_mask, error=image.safe_uncertainty),
)

# calc flux
bkg_median_np = np.array(bkg_median)
aper_bkg = bkg_median_np * aperture.area
sources["fluxaper%d" % diameter] = phot["aperture_sum"] - aper_bkg
if "aperture_sum_err" in phot.columns:
sources["fluxerr%d" % diameter] = phot["aperture_sum_err"]
sources["bkgaper%d" % diameter] = bkg_median_np

# copy image, set catalog and return it
img = image.copy()
img.catalog = sources
return img
def __init__(self, **kwargs: Any) -> None:
super().__init__(_PhotUtilAperturePhotometry(), **kwargs)


__all__ = ["PhotUtilsPhotometry"]
Loading

0 comments on commit 9570a32

Please sign in to comment.