Skip to content

Commit

Permalink
Add ccdexposure table and update capability.
Browse files Browse the repository at this point in the history
  • Loading branch information
ktlim committed May 28, 2024
1 parent 3cae561 commit 7f3e472
Showing 1 changed file with 22 additions and 8 deletions.
30 changes: 22 additions & 8 deletions python/lsst/consdb/hinfo.py
Original file line number Diff line number Diff line change
Expand Up @@ -280,7 +280,7 @@ def process_column(column_def: str | Sequence, info: dict) -> Any:
return fn(*arg_values)


def process_resource(resource: ResourcePath) -> None:
def process_resource(resource: ResourcePath, update: bool = False) -> None:
"""Process a header resource.
Uses configured mappings and the ObservationInfo translator to generate
Expand All @@ -292,7 +292,7 @@ def process_resource(resource: ResourcePath) -> None:
Path to the Header Service header resource.
"""
global KW_MAPPING, OI_MAPPING, instrument_mapping, det_mapping, translator
global engine, exposure_table
global engine, exposure_table, ccdexposure_table

exposure_rec = dict()

Expand All @@ -312,8 +312,12 @@ def process_resource(resource: ResourcePath) -> None:
for field, keyword in OI_MAPPING.items():
exposure_rec[field] = process_column(keyword, obs_info)

logging.debug(f"Inserting {exposure_rec}")
stmt = insert(exposure_table).values(exposure_rec).on_conflict_do_nothing()
stmt = insert(exposure_table).values(exposure_rec)
if update:
stmt = stmt.on_conflict_do_update(index_elements=["exposure_id"], set_=exposure_rec)
else:
stmt = stmt.on_conflict_do_nothing()
logging.debug(str(stmt))
with engine.begin() as conn:
conn.execute(stmt)

Expand All @@ -326,11 +330,20 @@ def process_resource(resource: ResourcePath) -> None:
det_info["detector"] = camera[ccdname].getId()
for header in content[detector]:
det_info[header["keyword"]] = header["value"]
for field, keyword in det_mapping.items():
det_exposure_rec[field] = process_column(keyword, det_info)
for column, column_def in det_mapping.items():
det_exposure_rec[column] = process_column(column_def, det_info)

stmt = insert(ccdexposure_table).values(det_exposure_rec)
if update:
stmt = stmt.on_conflict_do_update(index_elements=["ccdexposure_id"], set_=det_exposure_rec)
else:
stmt = stmt.on_conflict_do_nothing()
logging.debug(str(stmt))
with engine.begin() as conn:
conn.execute(stmt)


def process_date(day_obs: str) -> None:
def process_date(day_obs: str, update: bool = False) -> None:
"""Process all headers from a given observation day (as YYYY-MM-DD).
Parameters
Expand All @@ -344,7 +357,7 @@ def process_date(day_obs: str) -> None:
d = ResourcePath(f"s3://{bucket_prefix}rubinobs-lfa-cp/{TOPIC_MAPPING[instrument]}/header/{date}/")
for dirpath, dirnames, filenames in d.walk():
for fname in filenames:
process_resource(d.join(fname))
process_resource(d.join(fname), update)


##################
Expand Down Expand Up @@ -412,6 +425,7 @@ def get_kafka_config() -> KafkaConfig:
engine = setup_postgres()
metadata_obj = MetaData(schema=f"cdb_{instrument.lower()}")
exposure_table = Table("exposure", metadata_obj, autoload_with=engine)
ccdexposure_table = Table("ccdexposure", metadata_obj, autoload_with=engine)


bucket_prefix = os.environ.get("BUCKET_PREFIX", "")
Expand Down

0 comments on commit 7f3e472

Please sign in to comment.