diff --git a/python/lsst/consdb/hinfo.py b/python/lsst/consdb/hinfo.py index 19541a6a..b535fb5e 100644 --- a/python/lsst/consdb/hinfo.py +++ b/python/lsst/consdb/hinfo.py @@ -6,23 +6,26 @@ import sys from dataclasses import dataclass from datetime import datetime -from typing import Any, Sequence +from typing import TYPE_CHECKING, Any, Sequence -import aiokafka -import astropy.time -import httpx +import aiokafka # type: ignore +import astropy.time # type: ignore +import httpx # type: ignore import kafkit.registry -import kafkit.registry.httpx -import lsst.geom -import lsst.obs.lsst +import kafkit.registry.httpx # type: ignore +import lsst.geom # type: ignore +import lsst.obs.lsst # type: ignore import yaml from astro_metadata_translator import ObservationInfo -from lsst.obs.lsst.rawFormatter import LsstCamRawFormatter +from lsst.obs.lsst.rawFormatter import LsstCamRawFormatter # type: ignore from lsst.resources import ResourcePath from sqlalchemy import MetaData, Table from sqlalchemy.dialects.postgresql import insert from utils import setup_postgres +if TYPE_CHECKING: + import lsst.afw.cameraGeom # type: ignore + ############################### # Header Processing Functions # ############################### @@ -50,11 +53,16 @@ def logical_or(*bools: int | str | None) -> bool: return any([b == 1 or b == "1" for b in bools]) -def region(ra: float, dec: float, rotpa: float, points: list[tuple[str, int, int]]) -> str: - global camera, formatter +def region( + camera: lsst.afw.cameraGeom.Camera, + ra: float, + dec: float, + rotpa: float, + points: list[tuple[str, int, int]], +) -> str: region = "Polygon ICRS" for detector, offset_x, offset_y in points: - skywcs = formatter.makeRawSkyWcsFromBoresight( + skywcs = LsstCamRawFormatter.makeRawSkyWcsFromBoresight( lsst.geom.SpherePoint(ra, dec, lsst.geom.degrees), rotpa * lsst.geom.degrees, camera[detector], @@ -67,15 +75,19 @@ def region(ra: float, dec: float, rotpa: float, points: list[tuple[str, int, int return region -def ccdexposure_id(exposure_id: int, detector: int) -> int: - global translator +def ccdexposure_id( + translator: lsst.obs.lsst.translators.lsst.LsstBaseTranslator, exposure_id: int, detector: int +) -> int: return translator.compute_detector_exposure_id(exposure_id, detector) -def ccd_region(imgtype: str, ra: float, dec: float, rotpa: float, ccdname: str) -> str | None: +def ccd_region( + camera: lsst.afw.cameraGeom.Camera, imgtype: str, ra: float, dec: float, rotpa: float, ccdname: str +) -> str | None: if imgtype != "OBJECT": return None return region( + camera, ra, dec, rotpa, @@ -88,7 +100,9 @@ def ccd_region(imgtype: str, ra: float, dec: float, rotpa: float, ccdname: str) ) -def fp_region(imgtype: str, ra: float, dec: float, rotpa: float) -> str | None: +def fp_region( + camera: lsst.afw.cameraGeom.Camera, imgtype: str, ra: float, dec: float, rotpa: float +) -> str | None: global instrument if imgtype != "OBJECT": return None @@ -139,7 +153,7 @@ def fp_region(imgtype: str, ra: float, dec: float, rotpa: float) -> str | None: ] else: return None - return region(ra, dec, rotpa, corners) + return region(camera, ra, dec, rotpa, corners) ################################# @@ -188,7 +202,7 @@ def fp_region(imgtype: str, ra: float, dec: float, rotpa: float) -> str | None: "wind_dir": "WINDDIR", "dimm_seeing": "SEEING", "focus_z": "FOCUSZ", - "s_region": (fp_region, "IMGTYPE", "RA", "DEC", "ROTPA"), + "s_region": (fp_region, "camera", "IMGTYPE", "RA", "DEC", "ROTPA"), } # Instrument-specific mapping to column name from Header Service keyword @@ -206,6 +220,34 @@ def fp_region(imgtype: str, ra: float, dec: float, rotpa: float) -> str | None: ), } +LSSTCOMCAMSIM_MAPPING: dict[str, str | Sequence] = { + "simulated": ( + logical_or, + "SIMULATE MTMOUNT", + "SIMULATE MTM1M3", + "SIMULATE MTM2", + "SIMULATE CAMHEXAPOD", + "SIMULATE M2HEXAPOD", + "SIMULATE MTROTATOR", + "SIMULATE MTDOME", + "SIMULATE MTDOMETRAJECTORY", + ), +} + +LSSTCOMCAM_MAPPING: dict[str, str | Sequence] = { + "simulated": ( + logical_or, + "SIMULATE MTMOUNT", + "SIMULATE MTM1M3", + "SIMULATE MTM2", + "SIMULATE CAMHEXAPOD", + "SIMULATE M2HEXAPOD", + "SIMULATE MTROTATOR", + "SIMULATE MTDOME", + "SIMULATE MTDOMETRAJECTORY", + ), +} + LSSTCAM_MAPPING: dict[str, str | Sequence] = { "simulated": ( logical_or, @@ -221,10 +263,10 @@ def fp_region(imgtype: str, ra: float, dec: float, rotpa: float) -> str | None: } DETECTOR_MAPPING = { - "ccdexposure_id": (ccdexposure_id, "exposure_id", "detector"), + "ccdexposure_id": (ccdexposure_id, "translator", "exposure_id", "detector"), "exposure_id": "exposure_id", "detector": "detector", - "s_region": (ccd_region, "IMGTYPE", "RA", "DEC", "ROTPA", "ccdname"), + "s_region": (ccd_region, "camera", "IMGTYPE", "RA", "DEC", "ROTPA", "_CCDNAME"), } # Mapping to column name from ObservationInfo keyword @@ -239,7 +281,6 @@ def fp_region(imgtype: str, ra: float, dec: float, rotpa: float) -> str | None: TOPIC_MAPPING = { "LATISS": "ATHeaderService", "LSSTComCam": "CCHeaderService", - "LSSTComCamSim": "CCHeaderService", "LSSTCam": "MTHeaderService", } @@ -280,7 +321,7 @@ def process_column(column_def: str | Sequence, info: dict) -> Any: return fn(*arg_values) -def process_resource(resource: ResourcePath, update: bool = False) -> None: +def process_resource(resource: ResourcePath, instrument_dict: dict, update: bool = False) -> None: """Process a header resource. Uses configured mappings and the ObservationInfo translator to generate @@ -291,59 +332,68 @@ def process_resource(resource: ResourcePath, update: bool = False) -> None: resource : `ResourcePath` Path to the Header Service header resource. """ - global KW_MAPPING, OI_MAPPING, instrument_mapping, det_mapping, translator - global engine, exposure_table, ccdexposure_table + global KW_MAPPING, OI_MAPPING + global engine exposure_rec = dict() info = dict() content = yaml.safe_load(resource.read()) + for header in content["PRIMARY"]: info[header["keyword"]] = header["value"] + instrument_obj = instrument_dict[info["CONTRLLR"]] + info["camera"] = instrument_obj.camera + info["translator"] = instrument_obj.translator for column, column_def in KW_MAPPING.items(): exposure_rec[column] = process_column(column_def, info) - for column, column_def in instrument_mapping.items(): + for column, column_def in instrument_obj.instrument_mapping.items(): exposure_rec[column] = process_column(column_def, info) - obs_info_obj = ObservationInfo(info, translator_class=translator) + obs_info_obj = ObservationInfo(info, translator_class=instrument_obj.translator) obs_info = dict() for keyword in OI_MAPPING.values(): obs_info[keyword] = getattr(obs_info_obj, keyword) for field, keyword in OI_MAPPING.items(): exposure_rec[field] = process_column(keyword, obs_info) - stmt = insert(exposure_table).values(exposure_rec) + stmt = insert(instrument_obj.exposure_table).values(exposure_rec) if update: stmt = stmt.on_conflict_do_update(index_elements=["exposure_id"], set_=exposure_rec) else: - stmt = stmt.on_conflict_do_nothing() - logging.debug(str(stmt)) - with engine.begin() as conn: - conn.execute(stmt) - - detectors = [section for section in content if section.endswith("_PRIMARY")] - for detector in detectors: - det_exposure_rec = dict() - det_info = info.copy() - ccdname = f"{detector[0:3]}_{detector[3:6]}" - det_info["ccdname"] = ccdname - det_info["detector"] = camera[ccdname].getId() - for header in content[detector]: - det_info[header["keyword"]] = header["value"] - for column, column_def in det_mapping.items(): - det_exposure_rec[column] = process_column(column_def, det_info) - - stmt = insert(ccdexposure_table).values(det_exposure_rec) - if update: - stmt = stmt.on_conflict_do_update(index_elements=["ccdexposure_id"], set_=det_exposure_rec) - else: - stmt = stmt.on_conflict_do_nothing() + stmt.on_conflict_do_nothing() logging.debug(str(stmt)) with engine.begin() as conn: conn.execute(stmt) - -def process_date(day_obs: str, update: bool = False) -> None: + detectors = [section for section in content if section.endswith("_PRIMARY")] + for detector in detectors: + det_exposure_rec = dict() + det_info = info.copy() + ccdname = f"{detector[0:3]}_{detector[3:6]}" + if ccdname == "R00_S00" and instrument_obj.instrument_name == "latiss": + ccdname = "RXX_S00" + det_info["ccdname"] = ccdname + det_info["detector"] = instrument_obj.camera[ccdname].getId() + for header in content[detector]: + det_info[header["keyword"]] = header["value"] + for field, keyword in instrument_obj.det_mapping.items(): + det_exposure_rec[field] = process_column(keyword, det_info) + + det_stmt = insert(instrument_obj.ccdexposure_table).values(det_exposure_rec) + if update: + det_stmt = det_stmt.on_conflict_do_update( + index_elements=["ccdexposure_id"], set_=det_exposure_rec + ) + else: + det_stmt = det_stmt.on_conflict_do_nothing() + logging.debug(str(det_stmt)) + conn.execute(det_stmt) + + conn.commit() + + +def process_date(day_obs: str, instrument_dict: dict, update: bool = False) -> None: """Process all headers from a given observation day (as YYYY-MM-DD). Parameters @@ -357,7 +407,7 @@ def process_date(day_obs: str, update: bool = False) -> None: d = ResourcePath(f"s3://{bucket_prefix}rubinobs-lfa-cp/{TOPIC_MAPPING[instrument]}/header/{date}/") for dirpath, dirnames, filenames in d.walk(): for fname in filenames: - process_resource(d.join(fname), update) + process_resource(d.join(fname), instrument_dict, update) ################## @@ -365,7 +415,7 @@ def process_date(day_obs: str, update: bool = False) -> None: ################## -@dataclass +@dataclass(frozen=True) class KafkaConfig: """Class for configuring Kafka-related items.""" @@ -389,51 +439,36 @@ def get_kafka_config() -> KafkaConfig: logging.basicConfig(stream=sys.stderr, level=logging.INFO) instrument = os.environ["INSTRUMENT"] -# Cheat, since we know these are all derived from the same Instrument -formatter = LsstCamRawFormatter -match instrument: - case "LATISS": - from lsst.obs.lsst.translators import LatissTranslator - - translator = LatissTranslator - instrument_mapping = LATISS_MAPPING - det_mapping = DETECTOR_MAPPING - camera = lsst.obs.lsst.Latiss.getCamera() - case "LSSTComCam": - from lsst.obs.lsst.translators import LsstComCamTranslator - - translator = LsstComCamTranslator - instrument_mapping = LSSTCAM_MAPPING - det_mapping = DETECTOR_MAPPING - camera = lsst.obs.lsst.LsstComCam.getCamera() - case "LSSTComCamSim": - from lsst.obs.lsst.translators import LsstComCamSimTranslator - - translator = LsstComCamSimTranslator - instrument_mapping = LSSTCAM_MAPPING - det_mapping = DETECTOR_MAPPING - camera = lsst.obs.lsst.LsstComCamSim.getCamera() - case "LSSTCam": - from lsst.obs.lsst.translators import LsstCamTranslator - - translator = LsstCamTranslator - instrument_mapping = LSSTCAM_MAPPING - det_mapping = DETECTOR_MAPPING - camera = lsst.obs.lsst.LsstCam.getCamera() logging.info(f"Instrument = {instrument}") - -engine = setup_postgres() -metadata_obj = MetaData(schema=f"cdb_{instrument.lower()}") -exposure_table = Table("exposure", metadata_obj, autoload_with=engine) -ccdexposure_table = Table("ccdexposure", metadata_obj, autoload_with=engine) - - bucket_prefix = os.environ.get("BUCKET_PREFIX", "") if bucket_prefix: os.environ["LSST_DISABLE_BUCKET_VALIDATION"] = "1" -topic = f"lsst.sal.{TOPIC_MAPPING[instrument]}.logevent_largeFileObjectAvailable" +engine = setup_postgres() + + +@dataclass +class Instrument: + instrument_name: str + translator: lsst.obs.lsst.translators.lsst.LsstBaseTranslator + instrument_mapping: dict + det_mapping: dict + camera: lsst.afw.cameraGeom.Camera + metadata_obj: MetaData + exposure_table: Table + ccdexposure_table: Table + + def __init__(self, instrument_name, translator, instrument_mapping, det_mapping, camera): + global engine + self.instrument_name = instrument_name + self.translator = translator + self.instrument_mapping = instrument_mapping + self.det_mapping = det_mapping + self.camera = camera + self.metadata_obj = MetaData(schema=f"cdb_{instrument_name}") + self.exposure_table = Table("exposure", self.metadata_obj, autoload_with=engine) + self.ccdexposure_table = Table("ccdexposure", self.metadata_obj, autoload_with=engine) ################# @@ -443,8 +478,49 @@ def get_kafka_config() -> KafkaConfig: async def main() -> None: """Handle Header Service largeFileObjectAvailable messages.""" - global bucket_prefix + global instrument, bucket_prefix, TOPIC_MAPPING + + if instrument == "LATISS": + instrument_dict = { + "O": Instrument( + "latiss", + lsst.obs.lsst.translators.LatissTranslator, + LATISS_MAPPING, + DETECTOR_MAPPING, + lsst.obs.lsst.Latiss.getCamera(), + ), + } + elif instrument == "LSSTComCam": + instrument_dict = { + "O": Instrument( + "lsstcomcam", + lsst.obs.lsst.translators.LsstComCamTranslator, + LSSTCOMCAM_MAPPING, + DETECTOR_MAPPING, + lsst.obs.lsst.LsstComCam.getCamera(), + ), + "S": Instrument( + "lsstcomcamsim", + lsst.obs.lsst.translators.LsstComCamSimTranslator, + LSSTCOMCAMSIM_MAPPING, + DETECTOR_MAPPING, + lsst.obs.lsst.LsstComCamSim.getCamera(), + ), + } + elif instrument == "LSSTCam": + instrument_dict = { + "O": Instrument( + "lsstcam", + lsst.obs.lsst.translators.LsstCamTranslator, + LSSTCAM_MAPPING, + DETECTOR_MAPPING, + lsst.obs.lsst.LsstCam.getCamera(), + ), + } + else: + raise ValueError("Unrecognized instrument: {instrument}") + topic = f"lsst.sal.{TOPIC_MAPPING[instrument]}.logevent_largeFileObjectAvailable" kafka_config = get_kafka_config() async with httpx.AsyncClient() as client: schema_registry = kafkit.registry.httpx.RegistryApi(http_client=client, url=kafka_config.schema_url) @@ -477,7 +553,7 @@ async def main() -> None: logging.info(f"Waiting for {url}") while not resource.exists(): await asyncio.sleep(random.uniform(0.1, 2.0)) - process_resource(resource) + process_resource(resource, instrument_dict) logging.info(f"Processed {url}") finally: await consumer.stop()