Skip to content

Commit

Permalink
Fixed connection and queries in Butler for exposures and visits. Adde…
Browse files Browse the repository at this point in the history
…d prints for debug. now run.sh script is working until topic query.
  • Loading branch information
glaubervila committed Apr 30, 2024
1 parent ab90afb commit edab709
Show file tree
Hide file tree
Showing 4 changed files with 99 additions and 69 deletions.
12 changes: 12 additions & 0 deletions .tmp/config.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
topics:
- name: lsst.sal.WeatherForecast.dailyTrend
fields: [ "private_efdStamp", "temperatureMax"]
index: 103 # optional
window: 0.0 # optional (sec)
function: mean
# other optional information
# - name: lsst.sal.Some_other_topic
# fields: [ "field3" ]
# function: whatever
exposure_table: exposure
visit_table: visit
11 changes: 0 additions & 11 deletions python/lsst/consdb/config.yaml

This file was deleted.

141 changes: 86 additions & 55 deletions python/lsst/consdb/transform_efd.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,13 @@
import argparse
from typing import TYPE_CHECKING, Any, Callable
from typing import TYPE_CHECKING, Any, Callable, Optional

import astropy.time
import lsst_efd_client
import pandas
import yaml
from lsst.daf.butler import Butler, DimensionRecord
from sqlalchemy import create_engine, Engine

import asyncio

class Summary:
# TODO define summary
Expand Down Expand Up @@ -64,24 +64,49 @@ def read_config(config_name: str) -> dict[str, Any]:
return yaml.safe_load(f)


def get_efd_values(
async def get_efd_values(
efd: lsst_efd_client.EfdClient,
topic: dict[str, Any],
start: astropy.time.Time,
end: astropy.time.Time,
) -> pandas.DataFrame:
window = astropy.time.TimeDelta(topic.get("window", 0.0), format="sec")
series = efd.select_time_series(
series = await efd.select_time_series(
topic["name"],
topic["fields"],
start - window,
end + window,
start.utc - window,
end.utc + window,
topic.get("index", None),
)

print(f"Topic: {topic['name']}")
print(f"Window: {window}")
print(f"Series: {len(series)}")
print(series)

return EfdValues(topic, window, series)


def process_interval(
def get_exposures_by_period(butler: Butler, instrument: str, start: astropy.time.Time, end: astropy.time.Time, limit: Optional[int] = None):

where_clause = f"instrument=instr and exposure.timespan OVERLAPS (T'{start}', T'{end}')"
return butler.registry.queryDimensionRecords(
"exposure",
where=where_clause,
bind=dict(instr=instrument)
).limit(limit)

def get_visits_by_period(butler: Butler, instrument: str, start: astropy.time.Time, end: astropy.time.Time, limit: Optional[int] = None):

where_clause = f"instrument=instr and visit.timespan OVERLAPS (T'{start}', T'{end}')"
return butler.registry.queryDimensionRecords(
"visit",
where=where_clause,
bind=dict(instr=instrument)
).limit(limit)


async def process_interval(
butler: Butler,
db: Engine,
efd: lsst_efd_client.EfdClient,
Expand All @@ -90,47 +115,57 @@ def process_interval(
start_time: str,
end_time: str,
) -> None:

print(f"Process Interval")

start = astropy.time.Time(start_time, format="isot")
end = astropy.time.Time(end_time, format="isot")

print(f"start: {start}")
print(f"end: {end}")

exposure_list = []
visit_list = []
min_topic_time = end
max_topic_time = start

where_clause = "instrument=instr and timespan OVERLAPS (start, end)"

for e in butler.queryDimensionRecords(
"exposure",
where=where_clause,
bind=dict(instr=instrument, start=start, end=end),
):
if e.timespan.end < end:
exposure_list.append(e)
min_topic_time = min(e.timespan.begin, min_topic_time)
max_topic_time = max(e.timespan.begin, max_topic_time)

for v in butler.queryDimensionRecords(
"visit", where=where_clause, bind=dict(instr=instrument, start=start, end=end)
):
if v.timespan.end < end:
visit_list.append(v)
min_topic_time = min(v.timespan.begin, min_topic_time)
max_topic_time = max(v.timespan.begin, max_topic_time)
for record in get_exposures_by_period(butler, instrument, start, end, limit=10):
if record.timespan.end < end:
exposure_list.append(record)
min_topic_time = min(record.timespan.begin, min_topic_time)
max_topic_time = max(record.timespan.begin, max_topic_time)

for record in get_visits_by_period(butler, instrument, start, end, limit=10):
if record.timespan.end < end:
visit_list.append(record)
min_topic_time = min(record.timespan.begin, min_topic_time)
max_topic_time = max(record.timespan.begin, max_topic_time)

print(f"Exposures: {len(exposure_list)}")
print(f"Visits: {len(visit_list)}")
print(f"Min Topic time: {min_topic_time}")
print(f"Max Topic time: {max_topic_time}")


# exposure_records = Records(db)
# print("Exposure Records:")
# print(exposure_records)
# visit_records = Records(db)
# for topic in config["topics"]:
# efd_values = get_efd_values(efd, topic, min_topic_time, max_topic_time)
# for e in exposure_list:
# summary = efd_values.summarize(e.timespan.begin, e.timespan.end)
# exposure_records.add(e, topic, summary)
# for v in visit_list:
# summary = efd_values.summarize(v.timespan.begin, v.timespan.end)
# visit_records.add(v, topic, summary)

# exposure_records.write(config["exposure_table"])
# visit_records.write(config["visit_table"])

exposure_records = Records(db)
visit_records = Records(db)
for topic in config["topics"]:
efd_values = get_efd_values(efd, topic, min_topic_time, max_topic_time)
for e in exposure_list:
summary = efd_values.summarize(e.timespan.begin, e.timespan.end)
exposure_records.add(e, topic, summary)
for v in visit_list:
summary = efd_values.summarize(v.timespan.begin, v.timespan.end)
visit_records.add(v, topic, summary)

exposure_records.write(config["exposure_table"])
visit_records.write(config["visit_table"])
efd_values = await get_efd_values(efd, topic, min_topic_time, max_topic_time)
print(efd_values)


def build_argparser() -> argparse.ArgumentParser:
Expand Down Expand Up @@ -170,35 +205,31 @@ def build_argparser() -> argparse.ArgumentParser:
return parser


def main() -> None:
async def main() -> None:
print("----------- MAIN -----------")
parser = build_argparser()
args = parser.parse_args()

print("Butler:")
butler = Butler(args.repo)
print(butler)
print(f"Butler: {butler}")

# print("DB engine:")
# db = create_engine(args.db_conn_str)
# print(db)
db = create_engine(args.db_conn_str)
print(f"DB engine: {db}")

# print("EFD:")
# efd = lsst_efd_client.EfdClient(args.efd_conn_str)
# print(efd)
efd = lsst_efd_client.EfdClient(args.efd_conn_str)
print(f"EFD: {efd}")

# print("Configs:")
# config = read_config(args.config_name)
# print(config)
config = read_config(args.config_name)
# print(f"Configs: {config}")

# process_interval(
# butler, db, efd, config, args.instrument, args.start_time, args.end_time
# )
await process_interval(
butler, db, efd, config, args.instrument, args.start_time, args.end_time
)


if __name__ == "__main__":

# Exemplo de execução
# python transform_efd.py -i LATISS -s 2024-01-01T4:00:00 -e 2024-01-05T05:00:00 -r /repo/embargo -d sqlite://test.db -E usdf_efd -c test.yaml

main()
asyncio.run(main())

4 changes: 1 addition & 3 deletions run.sh
Original file line number Diff line number Diff line change
@@ -1,3 +1 @@
export DAF_BUTLER_REPOSITORY_INDEX=/sdf/group/rubin/shared/data-repos.yaml

python python/lsst/consdb/transform_efd.py -i LATISS -s 2024-01-01T4:00:00 -e 2024-01-05T05:00:00 -r /repo/embargo -d sqlite://test.db -E usdf_efd -c config.yaml
python python/lsst/consdb/transform_efd.py -i LATISS -s 2024-04-25T00:00:00 -e 2024-04-30T23:59:59 -r /repo/embargo -d sqlite://$PWD/.tmp/test.db -E usdf_efd -c $PWD/.tmp/config.yaml

0 comments on commit edab709

Please sign in to comment.