Skip to content

Commit

Permalink
Refactor and extend publish/query.
Browse files Browse the repository at this point in the history
  • Loading branch information
ktlim committed Apr 27, 2024
1 parent 458d145 commit a3a68e3
Show file tree
Hide file tree
Showing 3 changed files with 544 additions and 80 deletions.
294 changes: 239 additions & 55 deletions python/lsst/consdb/pqclient.py
Original file line number Diff line number Diff line change
@@ -1,58 +1,242 @@
# This file is part of consdb.
#
# Developed for the LSST Data Management System.
# This product includes software developed by the LSST Project
# (http://www.lsst.org).
# See the COPYRIGHT file at the top-level directory of this distribution
# for details of code ownership.
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.

import logging
import os
from pandas import DataFrame
from typing import Any
from urllib.parse import quote

import requests
from requests.exceptions import RequestException
from typing import Any, Iterable
from urllib.parse import urljoin

session = requests.Session()
base_url = os.environ["CONSDB_URL"]


def insert(table: str, values: dict[str, Any], **kwargs):
values.update(kwargs)
# check values against schema for table
data = {"table": table, "values": values}
url = urljoin(base_url, "insert")
try:
response = requests.post(url, json=data)
except RequestException as e:
raise e
response.raise_for_status()


def query(
tables: str | Iterable[str],
columns: str | Iterable[str],
*,
where: str | None = None,
join: str | None = None
) -> list[Any]:
if isinstance(tables, str):
tables = [tables]
if isinstance(columns, str):
columns = [columns]
url = urljoin(base_url, "query")
data = {"tables": tables, "columns": columns, "where": where, "join": join}
try:
response = requests.post(url, json=data)
except RequestException as e:
raise e
try:
from astropy.table import Table

__all__ = ["ConsDbClient"]


logger = logging.getLogger(__name__)


def urljoin(*args: str) -> str:
"""Join parts of a URL with slashes.
Does not do any quoting. Mostly to remove a level of list-making.
Parameters
----------
*args: `str`
Each parameter is a URL part.
Returns
-------
url: `str`
The joined URL.
"""
return "/".join(args)


class ConsDbClient:
"""A client library for accessing the Consolidated Database.
This library provides a basic interface for using flexible metadata
(key/value pairs associated with observation ids from an observation
type table), determining the schema of ConsDB tables, querying the
ConsDB using a general SQL SELECT statement, and inserting into
ConsDB tables.
Parameters
----------
url: `str`, optional
Base URL of the Web service, defaults to the value of environment
variable ``LSST_CONSDB_PQ_URL``.
Notes
-----
This client is a thin layer over a Web service, which avoids having a
dependency on database drivers.
It enforces the return of query results as Astropy Tables.
"""

def __init__(self, url: str | None = None):
self.session = requests.Session()
if url is None:
self.url = os.environ["LSST_CONSDB_PQ_URL"]
else:
self.url = url
self.url = self.url.rstrip("/")

def _handle_get(
self, url: str, query: dict[str, str | list[str]] | None = None
) -> Any:
"""Utility function to submit GET requests.
Parameters
----------
url: `str`
URL to GET.
query: `dict` [`str`, `str` | `list` [`str`]], optional
Query parameters to attach to the URL.
Raises
------
requests.exceptions.RequestException
Raised if any kind of connection error occurs.
requests.exceptions.HTTPError
Raised if a non-successful status is returned.
requests.exceptions.JSONDecodeError
Raised if the result does not decode as JSON.
Returns
-------
result: `Any`
Result of decoding the Web service result content as JSON.
"""
logger.debug(f"GET {url}")
try:
response = self.session.get(url, params=query)
except requests.exceptions.RequestException as e:
raise e
response.raise_for_status()
return response.json()

def _handle_post(self, url: str, data: dict[str, Any]) -> requests.Response:
"""Utility function to submit POST requests.
Parameters
----------
url: `str`
URL to POST.
data: `dict` [`str`, `Any`]
Key/value pairs of data to POST.
Raises
------
requests.exceptions.RequestException
Raised if any kind of connection error occurs.
requests.exceptions.HTTPError
Raised if a non-successful status is returned.
Returns
-------
result: `requests.Response`
The raw Web service result object.
"""
logger.debug(f"POST {url}: {data}")
try:
response = self.session.post(url, json=data)
except requests.exceptions.RequestException as e:
raise e
response.raise_for_status()
except Exception as ex:
print(response.content.decode())
raise ex
arr = response.json()
return DataFrame(arr[1:], columns=arr[0])


def schema(table: str):
url = urljoin(base_url, "schema/")
url = urljoin(url, table)
try:
response = requests.get(url)
except RequestException as e:
raise e
response.raise_for_status()
return response.json()
return response

@staticmethod
def compute_flexible_metadata_table_name(instrument: str, obs_type: str) -> str:
"""Public utility for computing flexible metadata table names.
Each instrument and observation type made with that instrument can
have a flexible metadata table. This function is useful when
issuing SQL queries, and it avoids a round-trip to the server.
Parameters
----------
instrument: `str`
Name of the instrument (e.g. ``LATISS``).
obs_type: `str`
Name of the observation type (e.g. ``Exposure``).
Returns
-------
table_name: `str`
Name of the appropriate flexible metadata table.
"""
return f"cdb_{instrument}.{obs_type}_flexdata"

def add_flexible_metadata_key(
self, instrument: str, obs_type: str, key: str, dtype: str, doc: str
) -> requests.Response:
data = {"key": key, "dtype": dtype, "doc": doc}
url = urljoin(self.url, "flex", quote(instrument), quote(obs_type), "addkey")
return self._handle_post(url, data)

def get_flexible_metadata_keys(
self, instrument: str, obs_type: str
) -> list[tuple[str, str, str]]:
url = urljoin(self.url, "flex", quote(instrument), quote(obs_type), "schema")
return self._handle_get(url)

def get_flexible_metadata(
self, instrument: str, obs_type: str, obs_id: int, keys: list[str] | None = None
) -> list[Any]:
url = urljoin(
self.url,
"flex",
quote(instrument),
quote(obs_type),
"obs",
quote(str(obs_id)),
)
return self._handle_get(url, {"k": keys} if keys else None)

def insert_flexible_metadata(
self,
instrument: str,
obs_type: str,
obs_id: int,
values: dict[str, Any],
**kwargs,
) -> requests.Response:
values.update(kwargs)
data = values
url = urljoin(
self.url,
"flex",
quote(instrument),
quote(obs_type),
"obs",
quote(str(obs_id)),
)
return self._handle_post(url, data)

def insert(
self,
instrument: str,
table: str,
obs_id: int,
values: dict[str, Any],
allow_update=False,
**kwargs,
) -> requests.Response:
values.update(kwargs)
data = {"table": table, "obs_id": obs_id, "values": values}
op = "upsert" if allow_update else "insert"
url = urljoin(self.url, op, quote(instrument))
return self._handle_post(url, data)

def query(self, query: str) -> Table:
url = urljoin(self.url, "query")
data = {"query": query}
result = self._handle_post(url, data).json()
return Table(rows=result["data"], names=result["columns"])

def schema(self, instrument: str, table: str) -> list[tuple[str, str, str]]:
url = urljoin(self.url, "schema", quote(instrument), quote(table))
return self._handle_get(url)
Loading

0 comments on commit a3a68e3

Please sign in to comment.