diff --git a/pyobs/images/meta/genericoffset.py b/pyobs/images/meta/genericoffset.py new file mode 100644 index 00000000..7f7cf61b --- /dev/null +++ b/pyobs/images/meta/genericoffset.py @@ -0,0 +1,4 @@ +class GenericOffset: + def __init__(self, dx: float, dy: float) -> None: + self.dx: float = dx + self.dy: float = dy diff --git a/pyobs/images/processors/offsets/astrometry.py b/pyobs/images/processors/offsets/astrometry.py index cbe06ef3..314cdc50 100644 --- a/pyobs/images/processors/offsets/astrometry.py +++ b/pyobs/images/processors/offsets/astrometry.py @@ -1,5 +1,5 @@ import logging -from typing import Any +from typing import Any, Tuple, Optional from astropy.coordinates import SkyCoord from astropy.wcs import WCS import astropy.units as u @@ -25,6 +25,9 @@ def __init__(self, **kwargs: Any): """ Offsets.__init__(self, **kwargs) + self._image: Optional[Image] = None + self._wcs: Optional[WCS] = None + async def __call__(self, image: Image) -> Image: """Processes an image and sets x/y pixel offset to reference in offset attribute. @@ -38,23 +41,27 @@ async def __call__(self, image: Image) -> Image: ValueError: If offset could not be found. """ - # copy image and get WCS - # we make our life a little easier by only using the new WCS from astrometry - img = image.copy() - wcs = WCS(img.header) + self._image = image.copy() + self._wcs = WCS(image.header) + + center_sky_coord, center_pixel_coord = self._get_coordinates_from_header(("CRVAL1", "CRVAL2")) + teleskope_sky_coord, telescope_pixel_coord = self._get_coordinates_from_header(("TEL-RA", "TEL-DEC")) + + offset = telescope_pixel_coord[0] - center_pixel_coord[0], telescope_pixel_coord[1] - center_pixel_coord[1] + on_sky_distance = center_sky_coord.separation(teleskope_sky_coord) - # get x/y coordinates from CRVAL1/2, i.e. from center with good WCS - center = SkyCoord(img.header["CRVAL1"] * u.deg, img.header["CRVAL2"] * u.deg, frame="icrs") - x_center, y_center = wcs.world_to_pixel(center) + self._image.set_meta(PixelOffsets(*offset)) + self._image.set_meta(OnSkyDistance(on_sky_distance)) + return self._image - # get x/y coordinates from TEL-RA/-DEC, i.e. from where the telescope thought it's pointing - tel = SkyCoord(img.header["TEL-RA"] * u.deg, img.header["TEL-DEC"] * u.deg, frame="icrs") - x_tel, y_tel = wcs.world_to_pixel(tel) + def _get_coordinates_from_header(self, header_cards: Tuple[str, str]) -> Tuple[SkyCoord, Tuple[float, float]]: + coordinates = SkyCoord( + self._image.header[header_cards[0]] * u.deg, # type: ignore + self._image.header[header_cards[1]] * u.deg, # type: ignore + frame="icrs") - # calculate offsets as difference between both - img.set_meta(PixelOffsets(x_tel - x_center, y_tel - y_center)) - img.set_meta(OnSkyDistance(center.separation(tel))) - return img + pixel_coordinates = self._wcs.world_to_pixel(coordinates) # type: ignore + return coordinates, pixel_coordinates __all__ = ["AstrometryOffsets"] diff --git a/pyobs/images/processors/offsets/brighteststar.py b/pyobs/images/processors/offsets/brighteststar.py index c55a6c39..eacd291c 100644 --- a/pyobs/images/processors/offsets/brighteststar.py +++ b/pyobs/images/processors/offsets/brighteststar.py @@ -1,7 +1,8 @@ import logging from typing import Tuple, Any -from astropy.coordinates import SkyCoord +from astropy.coordinates import Angle +from astropy.table import Table, Row from astropy.wcs import WCS from pyobs.images import Image @@ -16,12 +17,11 @@ class BrightestStarOffsets(Offsets): __module__ = "pyobs.images.processors.offsets" - def __init__(self, center: Tuple[str, str] = ("CRPIX1", "CRPIX2"), **kwargs: Any): + def __init__(self, center_header_cards: Tuple[str, str] = ("CRPIX1", "CRPIX2"), **kwargs: Any): """Initializes a new auto guiding system.""" Offsets.__init__(self, **kwargs) - # init - self._center = center + self._center_header_cards = center_header_cards async def __call__(self, image: Image) -> Image: """Processes an image and sets x/y pixel offset to reference in offset attribute. @@ -36,31 +36,33 @@ async def __call__(self, image: Image) -> Image: ValueError: If offset could not be found. """ - # get catalog and sort by flux - cat = image.safe_catalog - if cat is None or len(cat) < 1: + catalog = image.safe_catalog + if catalog is None or len(catalog) < 1: log.warning("No catalog found in image.") return image - cat.sort("flux", reverse=True) - # get first X/Y coordinates - x, y = cat["x"][0], cat["y"][0] + star_pos = self._get_brightest_star_position(catalog) + center = image.header[self._center_header_cards[0]], image.header[self._center_header_cards[1]] - # get center - center_x, center_y = image.header[self._center[0]], image.header[self._center[1]] + offset = (star_pos[0] - center[0], star_pos[1] - center[1]) + on_sky_distance = self._calc_on_sky_distance(image, center, star_pos) - # calculate offset - dx, dy = x - center_x, y - center_y + image.set_meta(PixelOffsets(*offset)) + image.set_meta(OnSkyDistance(on_sky_distance)) + return image + + @staticmethod + def _get_brightest_star_position(catalog: Table) -> Tuple[float, float]: + brightest_star: Row = max(catalog, key=lambda row: row["flux"]) + return brightest_star["x"], brightest_star["y"] - # get distance on sky + @staticmethod + def _calc_on_sky_distance(image: Image, center: Tuple[float, float], star_pos: Tuple[float, float]) -> Angle: wcs = WCS(image.header) - coords1 = wcs.pixel_to_world(center_x, center_y) - coords2 = wcs.pixel_to_world(center_x + dx, center_y + dy) + center_coordinates = wcs.pixel_to_world(*center) + star_coordinates = wcs.pixel_to_world(*star_pos) - # set it and return image - image.set_meta(PixelOffsets(dx, dy)) - image.set_meta(OnSkyDistance(coords1.separation(coords2))) - return image + return center_coordinates.separation(star_coordinates) __all__ = ["BrightestStarOffsets"] diff --git a/pyobs/images/processors/offsets/dummyoffsets.py b/pyobs/images/processors/offsets/dummyoffsets.py index f2492f03..47905715 100644 --- a/pyobs/images/processors/offsets/dummyoffsets.py +++ b/pyobs/images/processors/offsets/dummyoffsets.py @@ -1,10 +1,12 @@ +from typing import Any + from .offsets import Offsets from pyobs.images import Image from pyobs.object import get_class_from_string class DummyOffsets(Offsets): - def __init__(self, offset_class: str, offset: float = 1.0, **kwargs): + def __init__(self, offset_class: str, offset: float = 1.0, **kwargs: Any) -> None: super().__init__(**kwargs) self._offset = offset diff --git a/pyobs/images/processors/offsets/fitsheader.py b/pyobs/images/processors/offsets/fitsheader.py index f138aa9d..0c289dd6 100644 --- a/pyobs/images/processors/offsets/fitsheader.py +++ b/pyobs/images/processors/offsets/fitsheader.py @@ -4,6 +4,7 @@ from pyobs.images import Image from . import Offsets +from ...meta.genericoffset import GenericOffset log = logging.getLogger(__name__) @@ -15,7 +16,9 @@ def __init__(self, target: Tuple[str, str], center: Tuple[str, str] = ("DET-CPX1 """Initializes new fits header offsets.""" Offsets.__init__(self, **kwargs) - # store + if len(target) != 2 or len(center) != 2: + raise ValueError("Target and center must be of length 2!") + self.center = center self.target = target @@ -32,13 +35,14 @@ async def __call__(self, image: Image) -> Image: ValueError: If offset could not be found. """ - # get values from header target = [image.header[x] for x in self.target] center = [image.header[x] for x in self.center] - # calculate offset - image.meta["offsets"] = np.subtract(target, center) - return image + offset = np.subtract(target, center) + + output_image = image.copy() + output_image.set_meta(GenericOffset(*offset)) + return output_image __all__ = ["FitsHeaderOffsets"] diff --git a/pyobs/images/processors/offsets/nstar.py b/pyobs/images/processors/offsets/nstar.py index 371f6132..33ce788b 100644 --- a/pyobs/images/processors/offsets/nstar.py +++ b/pyobs/images/processors/offsets/nstar.py @@ -86,15 +86,15 @@ async def __call__(self, image: Image) -> Image: self.ref_boxes = await self._boxes_from_ref(image, star_box_size) # reset and finish - image.meta["offsets"] = (0, 0) + image.set_meta(PixelOffsets(0.0, 0.0)) return image except ValueError as e: # didn't work log.warning(f"Could not initialize reference image info due to exception '{e}'. Resetting...") await self.reset() - if "offsets" in image.meta: - del image.meta["offsets"] + if PixelOffsets in image.meta: + del image.meta[PixelOffsets] self.offset = None, None return image diff --git a/pyproject.toml b/pyproject.toml index 37332467..464f2328 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,7 +1,7 @@ [tool.poetry] name = "pyobs-core" packages = [{ include = "pyobs" }] -version = "1.12.2" +version = "1.12.3" description = "robotic telescope software" authors = ["Tim-Oliver Husser "] license = "MIT" diff --git a/tests/images/processors/offsets/__init__.py b/tests/images/processors/offsets/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/tests/images/processors/offsets/test_astrometry.py b/tests/images/processors/offsets/test_astrometry.py new file mode 100644 index 00000000..112ef7a5 --- /dev/null +++ b/tests/images/processors/offsets/test_astrometry.py @@ -0,0 +1,30 @@ +import numpy as np +import pytest +from astropy.io import fits +from astropy.utils.data import get_pkg_data_filename + +from pyobs.images import Image +from pyobs.images.meta import PixelOffsets, OnSkyDistance +from pyobs.images.processors.offsets import AstrometryOffsets + + +@pytest.mark.asyncio +async def test_call() -> None: + filename = get_pkg_data_filename('data/j94f05bgq_flt.fits', package='astropy.wcs.tests') + fits_file = fits.open(filename) + header = fits_file[1].header + header["TEL-RA"] = 5.63 + header["TEL-DEC"] = -72.05 + + image = Image(header=header) + + offsets = AstrometryOffsets() + + output_image = await offsets(image) + pixel_offset = output_image.get_meta(PixelOffsets) + + np.testing.assert_almost_equal(pixel_offset.dx, 128.94120449972797) + np.testing.assert_almost_equal(pixel_offset.dy, -309.1795167877043) + + on_sky_distance = output_image.get_meta(OnSkyDistance) + np.testing.assert_almost_equal(on_sky_distance.distance.value, 0.004575193216279022) diff --git a/tests/images/processors/offsets/test_brighteststar.py b/tests/images/processors/offsets/test_brighteststar.py new file mode 100644 index 00000000..5c551f35 --- /dev/null +++ b/tests/images/processors/offsets/test_brighteststar.py @@ -0,0 +1,60 @@ +import logging + +import numpy as np +import pytest +from astropy.io import fits +from astropy.table import QTable +from astropy.utils.data import get_pkg_data_filename + +from pyobs.images import Image +from pyobs.images.meta import PixelOffsets, OnSkyDistance +from pyobs.images.processors.offsets import BrightestStarOffsets + + +@pytest.mark.asyncio +async def test_missing_catalog(caplog: pytest.LogCaptureFixture) -> None: + offsets = BrightestStarOffsets() + image = Image() + + with caplog.at_level(logging.WARNING): + await offsets(image) + + assert caplog.messages[0] == "No catalog found in image." + + +@pytest.mark.asyncio +async def test_empty_catalog(caplog: pytest.LogCaptureFixture) -> None: + offsets = BrightestStarOffsets() + image = Image(catalog=QTable()) + + with caplog.at_level(logging.WARNING): + await offsets(image) + + assert caplog.messages[0] == "No catalog found in image." + + +@pytest.mark.asyncio +async def test_call() -> None: + fn = get_pkg_data_filename('data/j94f05bgq_flt.fits', package='astropy.wcs.tests') + f = fits.open(fn) + + catalog = QTable({"x": [2050], "y": [1020], "flux": [1]}) + image = Image(data=np.zeros((20, 20)), catalog=catalog, header=f[1].header) + + offsets = BrightestStarOffsets() + + output_image = await offsets(image) + pixel_offset = output_image.get_meta(PixelOffsets) + + assert pixel_offset.dx == 2.0 + assert pixel_offset.dy == -4.0 + + on_sky_distance = output_image.get_meta(OnSkyDistance) + np.testing.assert_almost_equal(on_sky_distance.distance.value, 6.06585686e-05) + + +@pytest.mark.asyncio +async def test_ordering() -> None: + catalog = QTable({"x": [2050, 2049], "y": [1020, 1021], "flux": [1, 2]}) + + assert BrightestStarOffsets._get_brightest_star_position(catalog) == (2049, 1021) diff --git a/tests/images/processors/offsets/test_dummyoffsets.py b/tests/images/processors/offsets/test_dummyoffsets.py new file mode 100644 index 00000000..36c21b32 --- /dev/null +++ b/tests/images/processors/offsets/test_dummyoffsets.py @@ -0,0 +1,16 @@ +import pytest + +from pyobs.images import Image +from pyobs.images.meta import PixelOffsets +from pyobs.images.processors.offsets import DummyOffsets + + +@pytest.mark.asyncio +async def test_dummy_offsets() -> None: + offsets = DummyOffsets("pyobs.images.meta.PixelOffsets", 10.0) + image = Image() + + output_image = await offsets(image) + offset = output_image.get_meta(PixelOffsets) + assert offset.dx == 10.0 + assert offset.dy == 10.0 diff --git a/tests/images/processors/offsets/test_fitsheader.py b/tests/images/processors/offsets/test_fitsheader.py new file mode 100644 index 00000000..01349adf --- /dev/null +++ b/tests/images/processors/offsets/test_fitsheader.py @@ -0,0 +1,35 @@ +import pytest +from astropy.io.fits import Header + +from pyobs.images import Image +from pyobs.images.meta.genericoffset import GenericOffset +from pyobs.images.processors.offsets import FitsHeaderOffsets + + +def test_attribute_validation() -> None: + with pytest.raises(ValueError): + FitsHeaderOffsets(("a", "b", "c"), ("a", "b", "c")) + + +@pytest.mark.asyncio +async def test_call() -> None: + header = Header({"A1": 4, "A2": 2, "B1": 2, "B2": 1}) + image = Image(header=header) + processor = FitsHeaderOffsets(("A1", "A2"), ("B1", "B2")) + result = await processor(image) + + offset = result.get_meta(GenericOffset) + assert offset.dx == 2 + assert offset.dy == 1 + + +@pytest.mark.asyncio +async def test_call_default() -> None: + header = Header({"A1": 4, "A2": 2, "DET-CPX1": 2, "DET-CPX2": 1}) + image = Image(header=header) + processor = FitsHeaderOffsets(("A1", "A2")) + result = await processor(image) + + offset = result.get_meta(GenericOffset) + assert offset.dx == 2 + assert offset.dy == 1 \ No newline at end of file