From b925ea13ad518315a9f13a22238ef1c349dd139b Mon Sep 17 00:00:00 2001 From: Kian-Tat Lim Date: Wed, 28 Feb 2024 02:55:42 -0800 Subject: [PATCH] Incorporate Kafka and handle all instruments. --- Dockerfile.hinfo | 15 +- python/lsst/consdb/header_proc.py | 89 --------- python/lsst/consdb/hinfo-latiss.py | 179 ----------------- python/lsst/consdb/hinfo.py | 306 +++++++++++++++++++++++++++++ 4 files changed, 317 insertions(+), 272 deletions(-) delete mode 100644 python/lsst/consdb/header_proc.py delete mode 100644 python/lsst/consdb/hinfo-latiss.py create mode 100644 python/lsst/consdb/hinfo.py diff --git a/Dockerfile.hinfo b/Dockerfile.hinfo index e2a988f4..1a02b88a 100644 --- a/Dockerfile.hinfo +++ b/Dockerfile.hinfo @@ -3,10 +3,17 @@ FROM lsstsqre/newinstall:${RUBINENV_VERSION} ARG OBS_LSST_VERSION ENV OBS_LSST_VERSION=${OBS_LSST_VERSION:-w_2024_06} USER lsst -RUN source loadLSST.bash && pip install confluent-kafka +RUN source loadLSST.bash && mamba install aiokafka httpx +RUN source loadLSST.bash && pip install kafkit RUN source loadLSST.bash && eups distrib install -t "${OBS_LSST_VERSION}" obs_lsst -COPY python/lsst/consdb/hinfo*.py ./hinfo/ +COPY python/lsst/consdb/hinfo.py ./hinfo/ + # Environment variables that must be set: -# POSTGRES_URL INSTRUMENT SITE +# INSTRUMENT: LATISS, LSSTComCam, LSSTComCamSim, LSSTCam +# POSTGRES_URL: SQLAlchemy connection URL +# KAFKA_BOOTSTRAP: AIOKafkaConsumer bootstrap server specification +# SCHEMA_URL: Kafkit registry schema URL +# Optional environment variable: +# BUCKET_PREFIX: "rubin:" at USDF -ENTRYPOINT [ "bash", "-c", "source loadLSST.bash; setup obs_lsst; python .hinfo/hinfo-latiss.py" ] +ENTRYPOINT [ "bash", "-c", "source loadLSST.bash; setup obs_lsst; python ./hinfo/hinfo.py" ] diff --git a/python/lsst/consdb/header_proc.py b/python/lsst/consdb/header_proc.py deleted file mode 100644 index 4c9ae206..00000000 --- a/python/lsst/consdb/header_proc.py +++ /dev/null @@ -1,89 +0,0 @@ -import asyncio -import json -import os -import random -import time -from typing import TYPE_CHECKING - -import aiokafka -import httpx -import kafkit -from lsst.resources import ResourcePath -from sqlalchemy import create_engine - -if TYPE_CHECKING: - import lsst.resources - -# Environment variables from deployment - -kafka_cluster = os.environ["KAFKA_CLUSTER"] -schema_url = os.environ["SCHEMA_URL"] -db_url = os.environ["DB_URL"] -tenant = os.environ.get("BUCKET_TENANT", None) -kafka_group_id = 1 - -topic = "lsst.ATHeaderService.logevent_largeFileObjectAvailable" - -engine = create_engine(db_url) - - -def process_resource(resource: lsst.resources.ResourcePath) -> None: - content = json.loads(resource.read()) - with engine.begin() as conn: - print(conn) - # TODO get all fields and tables, do as a transaction - # conn.execute( - # text("INSERT INTO exposure (a, b, c, d, e)" - # " VALUES(:a, :b, :c, :d, :e)"), - # [dict(a=content["something"], b=2, c=3, d=4, e=5)], - # ) - print(f"Processing {resource}: {content[0:100]}") - # TODO check result - - -async def main() -> None: - async with httpx.AsyncClient() as client: - schema_registry = kafkit.registry.RegistryApi(client=client, url=schema_url) - deserializer = kafkit.registry.Deserializer(registry=schema_registry) - - # Alternative 2: - # Something like - # asyncio.run(queue_check()) - - consumer = aiokafka.AIOKafkaConsumer( - topic, - bootstrap_servers=kafka_cluster, - group_id=str(kafka_group_id), - auto_offset_reset="earliest", - ) - await consumer.start() - try: - async for msg in consumer: - message = (await deserializer.deserialize(msg.value)).message - resource = ResourcePath(message.url) - if tenant: - new_scheme = resource.scheme - new_netloc = f"{tenant}:{resource.netloc}" - new_path = resource.path - resource = ResourcePath(f"{new_scheme}://{new_netloc}/{new_path}") - # Alternative 1: block for file - while not resource.exists(): - time.sleep(random.uniform(0.1, 2.0)) - process_resource(resource) - - # Alternative 2: queue - # r.sadd("EXPOSURE_QUEUE", str(resource)) - - finally: - await consumer.stop() - - -# Alternative 2: -# async def queue_check(): -# resource_list = r.slist("EXPOSURE_QUEUE", 0, -1) -# for resource in resource_list: -# if r.exists(): -# process_resource(resource) -# r.sremove("EXPOSURE_QUEUE", resource) - -asyncio.run(main()) diff --git a/python/lsst/consdb/hinfo-latiss.py b/python/lsst/consdb/hinfo-latiss.py deleted file mode 100644 index df30508d..00000000 --- a/python/lsst/consdb/hinfo-latiss.py +++ /dev/null @@ -1,179 +0,0 @@ -from datetime import datetime -import os -import sys -from typing import Any, Iterable - -import yaml -from astropy.time import Time -from sqlalchemy import create_engine, MetaData, Table -from sqlalchemy.dialects.postgresql import insert - -from astro_metadata_translator import ObservationInfo -from lsst.resources import ResourcePath -from lsst.obs.lsst.translators import LatissTranslator - -# import Kafka interface - - -def ninety_minus(angle: float) -> float: - return 90.0 - angle - - -def tai_convert(t: str) -> datetime: - return Time(t, format="isot", scale="tai").datetime - - -def tai_mean(start: str, end: str) -> datetime: - s = Time(start, format="isot", scale="tai") - e = Time(end, format="isot", scale="tai") - return (s + (e - s) / 2).datetime - - -def mean(*iterable: Iterable[Any]) -> Any: - return sum(iterable) / len(iterable) - - -def logical_or(*bools: Iterable[int | str | None]) -> bool: - return any([b == 1 or b == "1" for b in bools]) - - -KW_MAPPING = { - "controller": "CONTRLLR", - "seq_num": "SEQNUM", - "band": "FILTBAND", - "ra": "RA", - "decl": "DEC", - "skyrotation": "ROTPA", - "azimuth_start": "AZSTART", - "azimuth_end": "AZEND", - "altitude_start": (ninety_minus, "ELSTART"), - "altitude_end": (ninety_minus, "ELEND"), - "zenithdistance_start": "ELSTART", - "zenithdistance_end": "ELEND", - "expmidpt": (tai_mean, "DATE-BEG", "DATE-END"), - "expmidptmjd": (mean, "MJD-BEG", "MJD-END"), - "obsstart": (tai_convert, "DATE-BEG"), - "obsstartmjd": "MJD-BEG", - "obsend": (tai_convert, "DATE-END"), - "obsendmjd": "MJD-END", - "exptime": "EXPTIME", - "shuttime": "SHUTTIME", - "darktime": "DARKTIME", - "group_id": "GROUPID", - "curindex": "CURINDEX", - "maxindex": "MAXINDEX", - "imgtype": "IMGTYPE", - "emulated": (logical_or, "EMUIMAGE"), - "science_program": "PROGRAM", - "observation_reason": "REASON", - "target_name": "OBJECT", - "airtemp": "AIRTEMP", - "pressure": "PRESSURE", - "humidity": "HUMIDITY", - "wind_speed": "WINDSPD", - "wind_dir": "WINDDIR", - "dimm_seeing": "SEEING", -} - -LATISS_MAPPING = { - "focus_z": "FOCUSZ", - "dome_azimuth": "DOMEAZ", - "shut_lower": "SHUTLOWR", - "shut_upper": "SHUTUPPR", - # "temp_set": "TEMP_SET", - "simulated": (logical_or, "SIMULATE ATMCS", "SIMULATE ATHEXAPOD", "SIMULAT ATPNEUMATICS", - "SIMULATE ATDOME", "SIMULATE ATSPECTROGRAPH"), -} - -# LATISS_DETECTOR_MAPPING = { -# "ccdtemp": "CCDTEMP", -# } - -OI_MAPPING = { - "exposure_id": "exposure_id", - "physical_filter": "physical_filter", - "airmass": "boresight_airmass", - "day_obs": "observing_day", -} - -TOPIC_MAPPING = { - "LATISS": "ATHeaderService", - "LSSTComCam": "CCHeaderService", - "LSSTCam": "MTHeaderService", -} - - -url = os.environ.get("POSTGRES_URL", "postgresql://usdf-butler.slac.stanford.edu:5432/lsstdb1") -engine = create_engine(url) -instrument = os.environ.get("INSTRUMENT", "LATISS") -metadata_obj = MetaData(schema=f"cdb_{instrument.lower()}") -exposure_table = Table("exposure", metadata_obj, autoload_with=engine) - - -def process_keyword(keyword: str | tuple, info: dict) -> Any: - if type(keyword) is str: - if keyword in info: - return info[keyword] - elif type(keyword) is tuple: - fn = keyword[0] - args = keyword[1:] - if all([a in info for a in args]): - return fn(*[info[a] for a in args]) - - -def process_resource(resource: ResourcePath) -> None: - global engine, exposure_table - - content = yaml.safe_load(resource.read()) - exposure_rec = dict() - - info = dict() - for header in content["PRIMARY"]: - info[header["keyword"]] = header["value"] - for field, keyword in KW_MAPPING.items(): - exposure_rec[field] = process_keyword(keyword, info) - for field, keyword in LATISS_MAPPING.items(): - exposure_rec[field] = process_keyword(keyword, info) - -# det_info = dict() -# for header in content["R00S00_PRIMARY"]: -# det_info[header["keyword"]] = header["value"] -# for field, keyword in LATISS_DETECTOR_MAPPING.items(): -# det_exposure_rec[field] = process_keyword(keyword, det_info) - - obs_info_obj = ObservationInfo(info, translator_class=LatissTranslator) - obs_info = dict() - for keyword in OI_MAPPING.values(): - obs_info[keyword] = getattr(obs_info_obj, keyword) - for field, keyword in OI_MAPPING.items(): - exposure_rec[field] = process_keyword(keyword, obs_info) - - stmt = insert(exposure_table).values(exposure_rec).on_conflict_do_nothing() - with engine.begin() as conn: - conn.execute(stmt) - # result = conn.execute(stmt) - - # print(exposure_rec) - - -site = os.environ.get("SITE", "USDF") -if site == "USDF": - os.environ["LSST_DISABLE_BUCKET_VALIDATION"] = "1" - bucket_prefix = "rubin:" -else: - bucket_prefix = "" - -# For Kafka: -# consumer = configure_kafka() -# while True: -# msgs = consumer.consume() -# for msg in msgs: -# re.sub(r"s3://", "s3://" + bucket_prefix, msg.data) -# process_resource(msg.data) - -# To process all of a given date: -date = "/".join(sys.argv[1].split("-")) -d = ResourcePath(f"s3://{bucket_prefix}rubinobs-lfa-cp/{TOPIC_MAPPING[instrument]}/header/{date}/") -for dirpath, dirnames, filenames in d.walk(): - for fname in filenames: - process_resource(d.join(fname)) diff --git a/python/lsst/consdb/hinfo.py b/python/lsst/consdb/hinfo.py new file mode 100644 index 00000000..2687c3a7 --- /dev/null +++ b/python/lsst/consdb/hinfo.py @@ -0,0 +1,306 @@ +import asyncio +import os +import random +import re +from datetime import datetime +from typing import Any, Sequence + +import aiokafka +import astropy.time +import httpx +import kafkit +import yaml +from astro_metadata_translator import ObservationInfo +from lsst.resources import ResourcePath +from sqlalchemy import MetaData, Table, create_engine +from sqlalchemy.dialects.postgresql import insert + +############################### +# Header Processing Functions # +############################### + + +def ninety_minus(angle: float) -> float: + return 90.0 - angle + + +def tai_convert(t: str) -> datetime: + return astropy.time.Time(t, format="isot", scale="tai").datetime + + +def tai_mean(start: str, end: str) -> datetime: + s = astropy.time.Time(start, format="isot", scale="tai") + e = astropy.time.Time(end, format="isot", scale="tai") + return (s + (e - s) / 2).datetime + + +def mean(*iterable: float) -> Any: + return sum(iterable) / len(iterable) + + +def logical_or(*bools: int | str | None) -> bool: + return any([b == 1 or b == "1" for b in bools]) + + +################################# +# Header Mapping Configurations # +################################# + +# Non-instrument-specific mapping to column name from Header Service keyword +KW_MAPPING: dict[str, str | Sequence] = { + "controller": "CONTRLLR", + "seq_num": "SEQNUM", + "band": "FILTBAND", + "ra": "RA", + "decl": "DEC", + "skyrotation": "ROTPA", + "azimuth_start": "AZSTART", + "azimuth_end": "AZEND", + "altitude_start": (ninety_minus, "ELSTART"), + "altitude_end": (ninety_minus, "ELEND"), + "zenithdistance_start": "ELSTART", + "zenithdistance_end": "ELEND", + "expmidpt": (tai_mean, "DATE-BEG", "DATE-END"), + "expmidptmjd": (mean, "MJD-BEG", "MJD-END"), + "obsstart": (tai_convert, "DATE-BEG"), + "obsstartmjd": "MJD-BEG", + "obsend": (tai_convert, "DATE-END"), + "obsendmjd": "MJD-END", + "exptime": "EXPTIME", + "shuttime": "SHUTTIME", + "darktime": "DARKTIME", + "group_id": "GROUPID", + "curindex": "CURINDEX", + "maxindex": "MAXINDEX", + "imgtype": "IMGTYPE", + "emulated": (logical_or, "EMUIMAGE"), + "science_program": "PROGRAM", + "observation_reason": "REASON", + "target_name": "OBJECT", + "airtemp": "AIRTEMP", + "pressure": "PRESSURE", + "humidity": "HUMIDITY", + "wind_speed": "WINDSPD", + "wind_dir": "WINDDIR", + "dimm_seeing": "SEEING", +} + +# Instrument-specific mapping to column name from Header Service keyword +LATISS_MAPPING: dict[str, str | Sequence] = { + "focus_z": "FOCUSZ", + "dome_azimuth": "DOMEAZ", + "shut_lower": "SHUTLOWR", + "shut_upper": "SHUTUPPR", + # "temp_set": "TEMP_SET", + "simulated": ( + logical_or, + "SIMULATE ATMCS", + "SIMULATE ATHEXAPOD", + "SIMULAT ATPNEUMATICS", + "SIMULATE ATDOME", + "SIMULATE ATSPECTROGRAPH", + ), +} + +LSSTCOMCAM_MAPPING: dict[str, str | Sequence] = {} +LSSTCOMCAMSIM_MAPPING: dict[str, str | Sequence] = {} +LSSTCAM_MAPPING: dict[str, str | Sequence] = {} + +# LATISS_DETECTOR_MAPPING = { +# "ccdtemp": "CCDTEMP", +# } + +# Mapping to column name from ObservationInfo keyword +OI_MAPPING = { + "exposure_id": "exposure_id", + "physical_filter": "physical_filter", + "airmass": "boresight_airmass", + "day_obs": "observing_day", +} + +# Mapping from instrument name to Header Service topic name +TOPIC_MAPPING = { + "LATISS": "ATHeaderService", + "LSSTComCam": "CCHeaderService", + "LSSTComCamSim": "CCHeaderService", + "LSSTCam": "MTHeaderService", +} + + +######################## +# Processing Functions # +######################## + + +def process_column(column_def: str | Sequence, info: dict) -> Any: + """Generate a column value from one or more keyword values in a dict. + + The dict may contain FITS headers or ObservationInfo. + + Parameters + ---------- + column_def: `str` + Definition of the column. Either a string specifying the info keyword + to use as the column value, or a tuple containing a function to apply + to the values of one or more info keywords. + info: `dict` + A dictionary containing keyword/value pairs. + + Returns + ------- + column_value: `Any` + The value to use for the column. + """ + if type(column_def) is str: + if column_def in info: + return info[column_def] + elif type(column_def) is tuple: + fn = column_def[0] + args = column_def[1:] + if all([a in info for a in args]): + return fn(*[info[a] for a in args]) + + +def process_resource(resource: ResourcePath) -> None: + """Process a header resource. + + Uses configured mappings and the ObservationInfo translator to generate + column values that are inserted into the exposure table. + + Parameters + ---------- + resource: `ResourcePath` + Path to the Header Service header resource. + """ + global KW_MAPPING, OI_MAPPING, instrument_mapping, translator + global engine, exposure_table + + exposure_rec = dict() + + info = dict() + content = yaml.safe_load(resource.read()) + for header in content["PRIMARY"]: + info[header["keyword"]] = header["value"] + for column, column_def in KW_MAPPING.items(): + exposure_rec[column] = process_column(column_def, info) + for column, column_def in instrument_mapping.items(): + exposure_rec[column] = process_column(column_def, info) + + obs_info_obj = ObservationInfo(info, translator_class=translator) + obs_info = dict() + for keyword in OI_MAPPING.values(): + obs_info[keyword] = getattr(obs_info_obj, keyword) + for field, keyword in OI_MAPPING.items(): + exposure_rec[field] = process_column(keyword, obs_info) + + stmt = insert(exposure_table).values(exposure_rec).on_conflict_do_nothing() + with engine.begin() as conn: + conn.execute(stmt) + + # TODO: exposure_detector table processing + # det_info = dict() + # for header in content["R00S00_PRIMARY"]: + # det_info[header["keyword"]] = header["value"] + # for field, keyword in LATISS_DETECTOR_MAPPING.items(): + # det_exposure_rec[field] = process_column(keyword, det_info) + + +def process_date(day_obs: str) -> None: + """Process all headers from a given observation day (as YYYY-MM-DD). + + Parameters + ---------- + day_obs: `str` + Observation day to process, as YYYY-MM-DD. + """ + global TOPIC_MAPPING, bucket_prefix, instrument + + date = "/".join(day_obs.split("-")) + d = ResourcePath( + f"s3://{bucket_prefix}rubinobs-lfa-cp/{TOPIC_MAPPING[instrument]}/header/{date}/" + ) + for dirpath, dirnames, filenames in d.walk(): + for fname in filenames: + process_resource(d.join(fname)) + + +################## +# Initialization # +################## + +instrument = os.environ.get("INSTRUMENT", "LATISS") +match instrument: + case "LATISS": + from lsst.obs.lsst.translators import LatissTranslator + + translator = LatissTranslator + instrument_mapping = LATISS_MAPPING + case "LSSTComCam": + from lsst.obs.lsst.translators import LsstComCamTranslator + + translator = LsstComCamTranslator + instrument_mapping = LSSTCOMCAM_MAPPING + case "LSSTComCamSim": + from lsst.obs.lsst.translators import LsstComCamSimTranslator + + translator = LsstComCamSimTranslator + instrument_mapping = LSSTCOMCAMSIM_MAPPING + case "LSSTCam": + from lsst.obs.lsst.translators import LsstCamTranslator + + translator = LsstCamTranslator + instrument_mapping = LSSTCAM_MAPPING + +url = os.environ.get( + "POSTGRES_URL", "postgresql://usdf-butler.slac.stanford.edu:5432/lsstdb1" +) +engine = create_engine(url) +metadata_obj = MetaData(schema=f"cdb_{instrument.lower()}") +exposure_table = Table("exposure", metadata_obj, autoload_with=engine) + + +bucket_prefix = os.environ.get("BUCKET_PREFIX", "") +if bucket_prefix: + os.environ["LSST_DISABLE_BUCKET_VALIDATION"] = "1" + +kafka_bootstrap = os.environ["KAFKA_BOOTSTRAP"] +schema_url = os.environ["SCHEMA_URL"] +kafka_group_id = "1" + +topic = f"lsst.{TOPIC_MAPPING[instrument]}.logevent_largeFileObjectAvailable" + + +################# +# Main Function # +################# + + +async def main() -> None: + """Handle Header Service largeFileObjectAvailable messages.""" + global bucket_prefix, kafka_bootstrap, kafka_group_id, schema_url, topic + + async with httpx.AsyncClient() as client: + schema_registry = kafkit.registry.RegistryApi(client=client, url=schema_url) + deserializer = kafkit.registry.Deserializer(registry=schema_registry) + + consumer = aiokafka.AIOKafkaConsumer( + topic, + bootstrap_servers=kafka_bootstrap, + group_id=kafka_group_id, + auto_offset_reset="earliest", + ) + await consumer.start() + try: + async for msg in consumer: + message = (await deserializer.deserialize(msg.value)).message + if bucket_prefix: + url = re.sub(r"s3://", "s3://" + bucket_prefix, message.url) + resource = ResourcePath(url) + while not resource.exists(): + await asyncio.sleep(random.uniform(0.1, 2.0)) + process_resource(resource) + finally: + await consumer.stop() + + +asyncio.run(main())