diff --git a/python/lsst/consdb/hinfo.py b/python/lsst/consdb/hinfo.py index 1a190abe..19541a6a 100644 --- a/python/lsst/consdb/hinfo.py +++ b/python/lsst/consdb/hinfo.py @@ -280,7 +280,7 @@ def process_column(column_def: str | Sequence, info: dict) -> Any: return fn(*arg_values) -def process_resource(resource: ResourcePath) -> None: +def process_resource(resource: ResourcePath, update: bool = False) -> None: """Process a header resource. Uses configured mappings and the ObservationInfo translator to generate @@ -292,7 +292,7 @@ def process_resource(resource: ResourcePath) -> None: Path to the Header Service header resource. """ global KW_MAPPING, OI_MAPPING, instrument_mapping, det_mapping, translator - global engine, exposure_table + global engine, exposure_table, ccdexposure_table exposure_rec = dict() @@ -312,8 +312,12 @@ def process_resource(resource: ResourcePath) -> None: for field, keyword in OI_MAPPING.items(): exposure_rec[field] = process_column(keyword, obs_info) - logging.debug(f"Inserting {exposure_rec}") - stmt = insert(exposure_table).values(exposure_rec).on_conflict_do_nothing() + stmt = insert(exposure_table).values(exposure_rec) + if update: + stmt = stmt.on_conflict_do_update(index_elements=["exposure_id"], set_=exposure_rec) + else: + stmt = stmt.on_conflict_do_nothing() + logging.debug(str(stmt)) with engine.begin() as conn: conn.execute(stmt) @@ -326,11 +330,20 @@ def process_resource(resource: ResourcePath) -> None: det_info["detector"] = camera[ccdname].getId() for header in content[detector]: det_info[header["keyword"]] = header["value"] - for field, keyword in det_mapping.items(): - det_exposure_rec[field] = process_column(keyword, det_info) + for column, column_def in det_mapping.items(): + det_exposure_rec[column] = process_column(column_def, det_info) + + stmt = insert(ccdexposure_table).values(det_exposure_rec) + if update: + stmt = stmt.on_conflict_do_update(index_elements=["ccdexposure_id"], set_=det_exposure_rec) + else: + stmt = stmt.on_conflict_do_nothing() + logging.debug(str(stmt)) + with engine.begin() as conn: + conn.execute(stmt) -def process_date(day_obs: str) -> None: +def process_date(day_obs: str, update: bool = False) -> None: """Process all headers from a given observation day (as YYYY-MM-DD). Parameters @@ -344,7 +357,7 @@ def process_date(day_obs: str) -> None: d = ResourcePath(f"s3://{bucket_prefix}rubinobs-lfa-cp/{TOPIC_MAPPING[instrument]}/header/{date}/") for dirpath, dirnames, filenames in d.walk(): for fname in filenames: - process_resource(d.join(fname)) + process_resource(d.join(fname), update) ################## @@ -412,6 +425,7 @@ def get_kafka_config() -> KafkaConfig: engine = setup_postgres() metadata_obj = MetaData(schema=f"cdb_{instrument.lower()}") exposure_table = Table("exposure", metadata_obj, autoload_with=engine) +ccdexposure_table = Table("ccdexposure", metadata_obj, autoload_with=engine) bucket_prefix = os.environ.get("BUCKET_PREFIX", "")