Skip to content

Commit

Permalink
Add ccdexposure table and update capability.
Browse files Browse the repository at this point in the history
  • Loading branch information
ktlim committed May 28, 2024
1 parent 46cc6c3 commit 67a5a93
Showing 1 changed file with 70 additions and 21 deletions.
91 changes: 70 additions & 21 deletions python/lsst/consdb/hinfo.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ def region(ra: float, dec: float, rotpa: float, points: list[tuple[str, int, int
for detector, offset_x, offset_y in points:
skywcs = formatter.makeRawSkyWcsFromBoresight(
lsst.geom.SpherePoint(ra, dec, lsst.geom.degrees),
rotpa*lsst.geom.degrees,
rotpa * lsst.geom.degrees,
camera[detector],
)
bbox = camera[detector].getBBox()
Expand All @@ -73,9 +73,17 @@ def ccdexposure_id(exposure_id: int, detector: int) -> int:
def ccd_region(imgtype: str, ra: float, dec: float, rotpa: float, ccdname: str) -> str | None:
if imgtype != "OBJECT":
return None
return region(ra, dec, rotpa, [
(ccdname, 0, 0), (ccdname, 1, 0), (ccdname, 1, 1), (ccdname, 0, 1),
])
return region(
ra,
dec,
rotpa,
[
(ccdname, 0, 0),
(ccdname, 1, 0),
(ccdname, 1, 1),
(ccdname, 0, 1),
],
)


def fp_region(imgtype: str, ra: float, dec: float, rotpa: float) -> str | None:
Expand All @@ -84,21 +92,48 @@ def fp_region(imgtype: str, ra: float, dec: float, rotpa: float) -> str | None:
return None
if instrument == "LATISS":
corners = [
("RXX_S00", 0, 0), ("RXX_S00", 1, 0), ("RXX_S00", 1, 1), ("RXX_S00", 0, 1),
("RXX_S00", 0, 0),
("RXX_S00", 1, 0),
("RXX_S00", 1, 1),
("RXX_S00", 0, 1),
]
elif instrument == "LSSTComCam" or instrument == "LSSTComCamSim":
corners = [
("R22_S00", 0, 0), ("R22_S02", 1, 0), ("R22_S20", 0, 1), ("R22_S22", 1, 1),
("R22_S00", 0, 0),
("R22_S02", 1, 0),
("R22_S20", 0, 1),
("R22_S22", 1, 1),
]
elif instrument == "LSSTCam":
corners = [
("R01_S00", 0, 0), ("R03_S02", 1, 0), ("R03_S02", 1, 1), ("R04_SG1", 1, 0),
("R04_SG1", 1, 1), ("R04_SG0", 1, 0), ("R04_SG0", 1, 1), ("R14_S02", 1, 0),
("R34_S22", 1, 1), ("R34_S22", 0, 1), ("R44_SG1", 1, 1), ("R44_SG1", 0, 1),
("R44_SG0", 1, 1), ("R44_SG0", 0, 1), ("R43_S22", 1, 1), ("R41_S20", 0, 1),
("R41_S20", 0, 0), ("R40_SG1", 0, 1), ("R40_SG1", 0, 0), ("R40_SG0", 0, 1),
("R40_SG0", 0, 0), ("R30_S20", 0, 1), ("R10_S00", 0, 0), ("R10_S00", 1, 0),
("R00_SG1", 0, 0), ("R00_SG1", 1, 0), ("R00_SG0", 0, 0), ("R00_SG0", 1, 0),
("R01_S00", 0, 0),
("R03_S02", 1, 0),
("R03_S02", 1, 1),
("R04_SG1", 1, 0),
("R04_SG1", 1, 1),
("R04_SG0", 1, 0),
("R04_SG0", 1, 1),
("R14_S02", 1, 0),
("R34_S22", 1, 1),
("R34_S22", 0, 1),
("R44_SG1", 1, 1),
("R44_SG1", 0, 1),
("R44_SG0", 1, 1),
("R44_SG0", 0, 1),
("R43_S22", 1, 1),
("R41_S20", 0, 1),
("R41_S20", 0, 0),
("R40_SG1", 0, 1),
("R40_SG1", 0, 0),
("R40_SG0", 0, 1),
("R40_SG0", 0, 0),
("R30_S20", 0, 1),
("R10_S00", 0, 0),
("R10_S00", 1, 0),
("R00_SG1", 0, 0),
("R00_SG1", 1, 0),
("R00_SG0", 0, 0),
("R00_SG0", 1, 0),
]
else:
return None
Expand Down Expand Up @@ -243,7 +278,7 @@ def process_column(column_def: str | Sequence, info: dict) -> Any:
return fn(*arg_values)


def process_resource(resource: ResourcePath) -> None:
def process_resource(resource: ResourcePath, update: bool = False) -> None:
"""Process a header resource.
Uses configured mappings and the ObservationInfo translator to generate
Expand All @@ -255,7 +290,7 @@ def process_resource(resource: ResourcePath) -> None:
Path to the Header Service header resource.
"""
global KW_MAPPING, OI_MAPPING, instrument_mapping, det_mapping, translator
global engine, exposure_table
global engine, exposure_table, ccdexposure_table

exposure_rec = dict()

Expand All @@ -275,8 +310,12 @@ def process_resource(resource: ResourcePath) -> None:
for field, keyword in OI_MAPPING.items():
exposure_rec[field] = process_column(keyword, obs_info)

logging.debug(f"Inserting {exposure_rec}")
stmt = insert(exposure_table).values(exposure_rec).on_conflict_do_nothing()
stmt = insert(exposure_table).values(exposure_rec)
if update:
stmt = stmt.on_conflict_do_update(index_elements=["exposure_id"], set_=exposure_rec)
else:
stmt = stmt.on_conflict_do_nothing()
logging.debug(str(stmt))
with engine.begin() as conn:
conn.execute(stmt)

Expand All @@ -289,11 +328,20 @@ def process_resource(resource: ResourcePath) -> None:
det_info["detector"] = camera[ccdname].getId()
for header in content[detector]:
det_info[header["keyword"]] = header["value"]
for field, keyword in det_mapping.items():
det_exposure_rec[field] = process_column(keyword, det_info)
for column, column_def in det_mapping.items():
det_exposure_rec[column] = process_column(column_def, det_info)

stmt = insert(ccdexposure_table).values(det_exposure_rec)
if update:
stmt = stmt.on_conflict_do_update(index_elements=["ccdexposure_id"], set_=det_exposure_rec)
else:
stmt = stmt.on_conflict_do_nothing()
logging.debug(str(stmt))
with engine.begin() as conn:
conn.execute(stmt)


def process_date(day_obs: str) -> None:
def process_date(day_obs: str, update: bool = False) -> None:
"""Process all headers from a given observation day (as YYYY-MM-DD).
Parameters
Expand All @@ -307,7 +355,7 @@ def process_date(day_obs: str) -> None:
d = ResourcePath(f"s3://{bucket_prefix}rubinobs-lfa-cp/{TOPIC_MAPPING[instrument]}/header/{date}/")
for dirpath, dirnames, filenames in d.walk():
for fname in filenames:
process_resource(d.join(fname))
process_resource(d.join(fname), update)


##################
Expand Down Expand Up @@ -375,6 +423,7 @@ def get_kafka_config() -> KafkaConfig:
engine = setup_postgres()
metadata_obj = MetaData(schema=f"cdb_{instrument.lower()}")
exposure_table = Table("exposure", metadata_obj, autoload_with=engine)
ccdexposure_table = Table("ccdexposure", metadata_obj, autoload_with=engine)


bucket_prefix = os.environ.get("BUCKET_PREFIX", "")
Expand Down

0 comments on commit 67a5a93

Please sign in to comment.