diff --git a/.github/workflows/build.yaml b/.github/workflows/build.yaml index c5ac25d2..9dc14883 100644 --- a/.github/workflows/build.yaml +++ b/.github/workflows/build.yaml @@ -5,7 +5,7 @@ on: - main tags: - v* - pull_request: + pull_request: jobs: push: @@ -16,7 +16,7 @@ jobs: steps: - name: Checkout - uses: actions/checkout@v3 + uses: actions/checkout@v4 - name: Build hinfo image run: | diff --git a/.github/workflows/lint.yaml b/.github/workflows/lint.yaml index 2b20981b..3733b421 100644 --- a/.github/workflows/lint.yaml +++ b/.github/workflows/lint.yaml @@ -8,12 +8,12 @@ jobs: lint: runs-on: ubuntu-latest steps: - - uses: actions/checkout@v2 + - uses: actions/checkout@v4 - name: Set up Python - uses: actions/setup-python@v2 + uses: actions/setup-python@v5 with: - python-version: 3.7 + python-version: '3.11' - name: Install run: pip install -r <(curl https://raw.githubusercontent.com/lsst/linting/main/requirements.txt) diff --git a/Dockerfile.hinfo b/Dockerfile.hinfo index ca29f350..e2a988f4 100644 --- a/Dockerfile.hinfo +++ b/Dockerfile.hinfo @@ -3,10 +3,10 @@ FROM lsstsqre/newinstall:${RUBINENV_VERSION} ARG OBS_LSST_VERSION ENV OBS_LSST_VERSION=${OBS_LSST_VERSION:-w_2024_06} USER lsst -RUN source loadLSST.bash && mamba install confluent-kafka +RUN source loadLSST.bash && pip install confluent-kafka RUN source loadLSST.bash && eups distrib install -t "${OBS_LSST_VERSION}" obs_lsst -COPY python/lsst/consdb/hinfo.py ./hinfo/ +COPY python/lsst/consdb/hinfo*.py ./hinfo/ # Environment variables that must be set: # POSTGRES_URL INSTRUMENT SITE -ENTRYPOINT [ "bash", "-c", "source loadLSST.bash; setup obs_lsst; python hinfo/hinfo.py" ] +ENTRYPOINT [ "bash", "-c", "source loadLSST.bash; setup obs_lsst; python .hinfo/hinfo-latiss.py" ] diff --git a/Dockerfile.server b/Dockerfile.server new file mode 100644 index 00000000..9c2dd9c5 --- /dev/null +++ b/Dockerfile.server @@ -0,0 +1,8 @@ +FROM python:3.11 +RUN pip install flask gunicorn sqlalchemy +WORKDIR /consdb-server +COPY src/server.py /consdb-server/ +# Environment variables that must be set: +# POSTGRES_URL +ENTRYPOINT [ "gunicorn", "-b", "0.0.0.0:8000", "-w", "2", "server:app" ] + diff --git a/python/lsst/consdb/client.py b/python/lsst/consdb/client.py index 0e1beb34..fa7d8fcb 100644 --- a/python/lsst/consdb/client.py +++ b/python/lsst/consdb/client.py @@ -1,7 +1,7 @@ -import json import os from pandas import DataFrame import requests +from requests.exceptions import RequestException from typing import Any, Iterable from urllib.parse import urljoin @@ -16,8 +16,8 @@ def insert(table: str, values: dict[str, Any], **kwargs): url = urljoin(base_url, "insert") try: response = requests.post(url, json=data) - except: - raise + except RequestException as e: + raise e response.raise_for_status() @@ -36,13 +36,13 @@ def query( data = {"tables": tables, "columns": columns, "where": where, "join": join} try: response = requests.post(url, json=data) - except: - raise + except RequestException as e: + raise e try: response.raise_for_status() - except: + except Exception as ex: print(response.content.decode()) - raise + raise ex arr = response.json() return DataFrame(arr[1:], columns=arr[0]) @@ -52,7 +52,7 @@ def schema(table: str): url = urljoin(url, table) try: response = requests.get(url) - except: - raise + except RequestException as e: + raise e response.raise_for_status() return response.json() diff --git a/python/lsst/consdb/header_proc.py b/python/lsst/consdb/header_proc.py index 551684e7..4c9ae206 100644 --- a/python/lsst/consdb/header_proc.py +++ b/python/lsst/consdb/header_proc.py @@ -9,7 +9,7 @@ import httpx import kafkit from lsst.resources import ResourcePath -from sqlalchemy import create_engine, text +from sqlalchemy import create_engine if TYPE_CHECKING: import lsst.resources @@ -30,9 +30,11 @@ def process_resource(resource: lsst.resources.ResourcePath) -> None: content = json.loads(resource.read()) with engine.begin() as conn: + print(conn) # TODO get all fields and tables, do as a transaction # conn.execute( - # text("INSERT INTO exposure (a, b, c, d, e)" " VALUES(:a, :b, :c, :d, :e)"), + # text("INSERT INTO exposure (a, b, c, d, e)" + # " VALUES(:a, :b, :c, :d, :e)"), # [dict(a=content["something"], b=2, c=3, d=4, e=5)], # ) print(f"Processing {resource}: {content[0:100]}") @@ -51,7 +53,7 @@ async def main() -> None: consumer = aiokafka.AIOKafkaConsumer( topic, bootstrap_servers=kafka_cluster, - group_id=kafka_group_id, + group_id=str(kafka_group_id), auto_offset_reset="earliest", ) await consumer.start() diff --git a/python/lsst/consdb/hinfo-latiss.py b/python/lsst/consdb/hinfo-latiss.py index 1f7623e0..df30508d 100644 --- a/python/lsst/consdb/hinfo-latiss.py +++ b/python/lsst/consdb/hinfo-latiss.py @@ -15,21 +15,24 @@ # import Kafka interface - def ninety_minus(angle: float) -> float: return 90.0 - angle + def tai_convert(t: str) -> datetime: return Time(t, format="isot", scale="tai").datetime + def tai_mean(start: str, end: str) -> datetime: s = Time(start, format="isot", scale="tai") e = Time(end, format="isot", scale="tai") return (s + (e - s) / 2).datetime + def mean(*iterable: Iterable[Any]) -> Any: return sum(iterable) / len(iterable) + def logical_or(*bools: Iterable[int | str | None]) -> bool: return any([b == 1 or b == "1" for b in bools]) @@ -77,8 +80,9 @@ def logical_or(*bools: Iterable[int | str | None]) -> bool: "dome_azimuth": "DOMEAZ", "shut_lower": "SHUTLOWR", "shut_upper": "SHUTUPPR", -# "temp_set": "TEMP_SET", - "simulated": (logical_or, "SIMULATE ATMCS", "SIMULATE ATHEXAPOD", "SIMULAT ATPNEUMATICS", "SIMULATE ATDOME", "SIMULATE ATSPECTROGRAPH"), + # "temp_set": "TEMP_SET", + "simulated": (logical_or, "SIMULATE ATMCS", "SIMULATE ATHEXAPOD", "SIMULAT ATPNEUMATICS", + "SIMULATE ATDOME", "SIMULATE ATSPECTROGRAPH"), } # LATISS_DETECTOR_MAPPING = { @@ -93,9 +97,9 @@ def logical_or(*bools: Iterable[int | str | None]) -> bool: } TOPIC_MAPPING = { - "LATISS": "ATHeaderService", - "LSSTComCam": "CCHeaderService", - "LSSTCam": "MTHeaderService", + "LATISS": "ATHeaderService", + "LSSTComCam": "CCHeaderService", + "LSSTCam": "MTHeaderService", } @@ -107,15 +111,16 @@ def logical_or(*bools: Iterable[int | str | None]) -> bool: def process_keyword(keyword: str | tuple, info: dict) -> Any: - if type(keyword) == str: + if type(keyword) is str: if keyword in info: return info[keyword] - elif type(keyword) == tuple: + elif type(keyword) is tuple: fn = keyword[0] args = keyword[1:] if all([a in info for a in args]): return fn(*[info[a] for a in args]) + def process_resource(resource: ResourcePath) -> None: global engine, exposure_table @@ -145,7 +150,8 @@ def process_resource(resource: ResourcePath) -> None: stmt = insert(exposure_table).values(exposure_rec).on_conflict_do_nothing() with engine.begin() as conn: - result = conn.execute(stmt) + conn.execute(stmt) + # result = conn.execute(stmt) # print(exposure_rec) diff --git a/python/lsst/consdb/server.py b/python/lsst/consdb/server.py index 106c9007..5344db69 100644 --- a/python/lsst/consdb/server.py +++ b/python/lsst/consdb/server.py @@ -1,6 +1,5 @@ from flask import Flask, request -from sqlalchemy import create_engine, MetaData, Table -from sqlalchemy.dialects.postgresql import insert +from sqlalchemy import create_engine, MetaData import sqlalchemy.exc