Skip to content

Commit

Permalink
Merge pull request #15 from lsst-dm/tickets/DM-44429
Browse files Browse the repository at this point in the history
DM-44429: Update hinfo for CcdExposure table and include focal plane and detector regions.
  • Loading branch information
ktlim authored May 28, 2024
2 parents a0fa3e5 + 7f3e472 commit ecbea1a
Showing 1 changed file with 28 additions and 14 deletions.
42 changes: 28 additions & 14 deletions python/lsst/consdb/hinfo.py
Original file line number Diff line number Diff line change
Expand Up @@ -224,7 +224,7 @@ def fp_region(imgtype: str, ra: float, dec: float, rotpa: float) -> str | None:
"ccdexposure_id": (ccdexposure_id, "exposure_id", "detector"),
"exposure_id": "exposure_id",
"detector": "detector",
"s_region": (ccd_region, "IMGTYPE", "RA", "DEC", "ROTPA", "_CCDNAME"),
"s_region": (ccd_region, "IMGTYPE", "RA", "DEC", "ROTPA", "ccdname"),
}

# Mapping to column name from ObservationInfo keyword
Expand Down Expand Up @@ -256,17 +256,17 @@ def process_column(column_def: str | Sequence, info: dict) -> Any:
Parameters
----------
column_def: `str`
column_def : `str`
Definition of the column. Either a string specifying the info keyword
to use as the column value, or a tuple containing a function to apply
to the values of one or more info keywords or function application
tuples.
info: `dict`
info : `dict`
A dictionary containing keyword/value pairs.
Returns
-------
column_value: `Any`
column_value : `Any`
The value to use for the column.
None if any input value is missing.
"""
Expand All @@ -280,19 +280,19 @@ def process_column(column_def: str | Sequence, info: dict) -> Any:
return fn(*arg_values)


def process_resource(resource: ResourcePath) -> None:
def process_resource(resource: ResourcePath, update: bool = False) -> None:
"""Process a header resource.
Uses configured mappings and the ObservationInfo translator to generate
column values that are inserted into the exposure table.
Parameters
----------
resource: `ResourcePath`
resource : `ResourcePath`
Path to the Header Service header resource.
"""
global KW_MAPPING, OI_MAPPING, instrument_mapping, det_mapping, translator
global engine, exposure_table
global engine, exposure_table, ccdexposure_table

exposure_rec = dict()

Expand All @@ -312,8 +312,12 @@ def process_resource(resource: ResourcePath) -> None:
for field, keyword in OI_MAPPING.items():
exposure_rec[field] = process_column(keyword, obs_info)

logging.debug(f"Inserting {exposure_rec}")
stmt = insert(exposure_table).values(exposure_rec).on_conflict_do_nothing()
stmt = insert(exposure_table).values(exposure_rec)
if update:
stmt = stmt.on_conflict_do_update(index_elements=["exposure_id"], set_=exposure_rec)
else:
stmt = stmt.on_conflict_do_nothing()
logging.debug(str(stmt))
with engine.begin() as conn:
conn.execute(stmt)

Expand All @@ -326,16 +330,25 @@ def process_resource(resource: ResourcePath) -> None:
det_info["detector"] = camera[ccdname].getId()
for header in content[detector]:
det_info[header["keyword"]] = header["value"]
for field, keyword in det_mapping.items():
det_exposure_rec[field] = process_column(keyword, det_info)
for column, column_def in det_mapping.items():
det_exposure_rec[column] = process_column(column_def, det_info)

stmt = insert(ccdexposure_table).values(det_exposure_rec)
if update:
stmt = stmt.on_conflict_do_update(index_elements=["ccdexposure_id"], set_=det_exposure_rec)
else:
stmt = stmt.on_conflict_do_nothing()
logging.debug(str(stmt))
with engine.begin() as conn:
conn.execute(stmt)


def process_date(day_obs: str) -> None:
def process_date(day_obs: str, update: bool = False) -> None:
"""Process all headers from a given observation day (as YYYY-MM-DD).
Parameters
----------
day_obs: `str`
day_obs : `str`
Observation day to process, as YYYY-MM-DD.
"""
global TOPIC_MAPPING, bucket_prefix, instrument
Expand All @@ -344,7 +357,7 @@ def process_date(day_obs: str) -> None:
d = ResourcePath(f"s3://{bucket_prefix}rubinobs-lfa-cp/{TOPIC_MAPPING[instrument]}/header/{date}/")
for dirpath, dirnames, filenames in d.walk():
for fname in filenames:
process_resource(d.join(fname))
process_resource(d.join(fname), update)


##################
Expand Down Expand Up @@ -412,6 +425,7 @@ def get_kafka_config() -> KafkaConfig:
engine = setup_postgres()
metadata_obj = MetaData(schema=f"cdb_{instrument.lower()}")
exposure_table = Table("exposure", metadata_obj, autoload_with=engine)
ccdexposure_table = Table("ccdexposure", metadata_obj, autoload_with=engine)


bucket_prefix = os.environ.get("BUCKET_PREFIX", "")
Expand Down

0 comments on commit ecbea1a

Please sign in to comment.