Skip to content

Commit

Permalink
Add support for LSST. Fixup Schema class and related.
Browse files Browse the repository at this point in the history
  • Loading branch information
troyraen committed Jul 1, 2024
1 parent 2e7c5fe commit b91c3c3
Show file tree
Hide file tree
Showing 10 changed files with 241 additions and 136 deletions.
26 changes: 25 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,13 +11,37 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/)
## \[Unreleased\]

<!-- (none) -->

### Added

- Support for the LSST alert schema.
- `types_.Schema._from_yaml` class method and the related helpers `_local_schema_helper` and
`_lsst_schema_helper`.
- `types_.Schema.schemaless_alert_bytes` bool indicating whether the alert bytes are schemaless
and thus a `types_.Schema.definition` is required in order to serialize and deserialize them.
- `types_.Schama.manifest` containing the schemas.yml file loaded as a list of dicts.
- `types_.Schema.filter_map`, moved from the schema map's "FILTER_MAP".
- `Schema.origin` and schema-map key name "SCHEMA_ORIGIN" (see Changed).
- `types_.Schema.definition`. Not actually new, but repurposed (see Changed, Removed).

### Changed

- Make `Alert` method private, `add_id_attributes` -> `_add_id_attributes`.
- Changed some schema-map keys to include an underscore for clarity, e.g., "magerr" -> "mag_err"
(breaking change).
- Change method to private `Alert.add_id_attributes` -> `Alert._add_id_attributes`.
- Changed attribute name `types_.Schema.definition` -> `Schema.origin`. Related, changed
schema-map key name "SURVEY_SCHEMA" -> "SCHEMA_ORIGIN". Both for clarity.
- `types_.Schema.definition` is now used to hold the actual schema definition. Currently this only
needed for Avro and so holds the dict loaded from the ".avsc" file(s).
- Update docstrings for clarity and accuracy.
- Improve type hints.
- Fix up Sphinx and rst to improve how docs are being rendered.

### Removed

- `types_.Schema.avsc`. Replaced by `types_.Schema.definition`.
- Schema-map keys "SURVEY_SCHEMA" (replaced), "TOPIC_SYNTAX" (dropped), "FILTER_MAP" (moved).

## \[v0.3.4\] - 2024-06-29

### Added
Expand Down
28 changes: 3 additions & 25 deletions pittgoogle/alert.py
Original file line number Diff line number Diff line change
Expand Up @@ -238,31 +238,9 @@ def dict(self) -> Mapping:
if self._dict is not None:
return self._dict

# [TODO] Add a `required` attribute to types_.Schema (whether the schema is required in order to deserialize the alerts).
# deserialize self.msg.data (avro or json bytestring) into a dict.
# if self.msg.data is either (1) json; or (2) avro that contains the schema in the header,
# self.schema is not required for deserialization, so we want to be lenient.
# if self.msg.data is schemaless avro, deserialization requires self.schema.avsc to exist.
# currently, there is a clean separation between surveys:
# elasticc always requires self.schema.avsc; ztf never does.
# we'll check the survey name from self.schema.survey; but first we need to check whether
# the schema exists so we can try to continue without one instead of raising an error.
# we may want or need to handle this differently in the future.
try:
self.schema
except SchemaNotFoundError as exc:
LOGGER.warning(f"schema not found. attempting to deserialize without it. {exc}")
avro_schema = None
else:
if self.schema.survey in ["elasticc"]:
avro_schema = self.schema.avsc
else:
avro_schema = None

# if we have an avro schema, use it to deserialize and return
if avro_schema:
with io.BytesIO(self.msg.data) as fin:
self._dict = fastavro.schemaless_reader(fin, avro_schema)
if self.schema.schemaless_alert_bytes:
bytes_io = io.BytesIO(self.msg.data)
self._dict = fastavro.schemaless_reader(bytes_io, self.schema.definition)
return self._dict

# [TODO] this should be rewritten to catch specific errors
Expand Down
2 changes: 1 addition & 1 deletion pittgoogle/pubsub.py
Original file line number Diff line number Diff line change
Expand Up @@ -239,7 +239,7 @@ def publish(self, alert: "Alert") -> int:
avro_schema = None
else:
if alert.schema.survey in ["elasticc"]:
avro_schema = alert.schema.avsc
avro_schema = alert.schema.definition
else:
avro_schema = None

Expand Down
88 changes: 53 additions & 35 deletions pittgoogle/registry.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,51 +36,69 @@ class ProjectIds:
class Schemas:
"""Registry of schemas used by Pitt-Google.
Example:
Examples:
.. code-block:: python
.. code-block:: python
# View list of registered schema names.
pittgoogle.Schemas.names
# View list of registered schema names.
pittgoogle.Schemas.names
# Load a schema (choose a name from above and substitute it below).
schema = pittgoogle.Schemas.get(schema_name="ztf")
# View more information about the schemas.
pittgoogle.Schemas.manifest
# Load a schema (choose a name from above and substitute it below).
schema = pittgoogle.Schemas.get(schema_name="ztf")
For Developers:
**For Developers**: :doc:`/for-developers/add-new-schema`
Register a New Schema
Schemas are defined in the yaml file [registry_manifests/schemas.yml](registry_manifests/schemas.yml).
To register a new schema, add a section to that file.
The fields are the same as those of a :class:`pittgoogle.types_.Schema`.
The `helper` field value must be the name of a valid `*_helper` method in :class:`pittgoogle.types_.Schema`.
If a suitable method does not already exist for your schema, add one by following the default as an example.
If your new helper method requires a new dependency, be sure to add it following
:doc:`/main/for-developers/manage-dependencies-poetry`
If you want to include your schema's ".avsc" file with the pittgoogle package, be sure to
commit the file(s) to the repo under the "schemas" directory.
----
"""

@classmethod
def get(cls, schema_name: str) -> types_.Schema:
"""Return the schema registered with name `schema_name`.
Raises
------
:class:`pittgoogle.exceptions.SchemaNotFoundError`
if a schema called `schema_name` is not found
@staticmethod
def get(schema_name: str) -> types_.Schema:
"""Return the schema with name matching `schema_name`.
Returns:
Schema:
Schema from the registry with name matching `schema_name`.
Raises:
SchemaNotFoundError:
If a schema with name matching `schema_name` is not found in the registry.
SchemaNotFoundError:
If a schema definition cannot be loaded but one will be required to read the alert bytes.
"""
for schema in SCHEMA_MANIFEST:
if schema["name"] != schema_name:
continue
return types_.Schema.from_yaml(schema_dict=schema)

# Return the schema with name == schema_name, if one exists.
for mft_schema in SCHEMA_MANIFEST:
if mft_schema["name"] == schema_name:
return types_.Schema._from_yaml(schema_dict=mft_schema)

# Return the schema with name ~= schema_name, if one exists.
for mft_schema in SCHEMA_MANIFEST:
# Case 1: Split by "." and check whether first and last parts match.
# Catches names like 'lsst.v<MAJOR>_<MINOR>.alert' where users replace '<..>' with custom values.
split_name, split_mft_name = schema_name.split("."), mft_schema["name"].split(".")
if all([split_mft_name[i] == split_name[i] for i in [0, -1]]):
return types_.Schema._from_yaml(schema_dict=mft_schema, name=schema_name)

# That's all we know how to check so far.
raise SchemaNotFoundError(
f"{schema_name} not found. for a list of valid names, use `pittgoogle.Schemas.names()`."
f"{schema_name} not found. For valid names, see `pittgoogle.Schemas.names`."
)

@staticmethod
def names() -> list[str]:
"""Return the names of all registered schemas."""
@property
def names(self) -> list[str]:
"""Names of all registered schemas.
A name from this list can be used with the :meth:`Schemas.get` method to load a schema.
Capital letters between angle brackets indicate that you should substitute your own
values. For example, to use the LSST schema listed here as ``"lsst.v<MAJOR>_<MINOR>.alert"``,
choose your own major and minor versions and use like ``pittgoogle.Schemas.get("lsst.v7_1.alert")``.
View available schema versions by following the `origin` link in :attr:`Schemas.manifest`.
"""
return [schema["name"] for schema in SCHEMA_MANIFEST]

@property
def manifest(self) -> list[dict]:
"""List of dicts containing the registration information of all known schemas."""
return SCHEMA_MANIFEST
63 changes: 47 additions & 16 deletions pittgoogle/registry_manifests/schemas.yml
Original file line number Diff line number Diff line change
Expand Up @@ -8,24 +8,55 @@
#
#
# # TEMPLATE
# # See the registry.Schemas docstring for an explanation of the fields.
# - name: ""
# description: ""
# helper: ""
# path: ""
# - name: ''
# description: ''
# helper: '_local_schema_helper'
# path: ''
# schemaless_alert_bytes: false
# filter_map:
# 1: ''
# 2: ''
#
# ELASTICC alerts
- name: "elasticc.v0_9_1.alert"
description: "Avro schema of alerts published by ELAsTiCC."
path: "schemas/elasticc/elasticc.v0_9_1.alert.avsc"
- name: 'elasticc.v0_9_1.alert'
description: 'Avro schema of alerts published by ELAsTiCC.'
origin: 'https://github.com/LSSTDESC/elasticc/tree/main/alert_schema'
helper: '_local_schema_helper'
path: 'schemas/elasticc/elasticc.v0_9_1.alert.avsc'
schemaless_alert_bytes: true
#
# ELASTICC classifications
- name: "elasticc.v0_9_1.brokerClassification"
description: "Avro schema of alerts to be sent to DESC containing classifications of ELAsTiCC alerts."
path: "schemas/elasticc/elasticc.v0_9_1.brokerClassification.avsc"
# Rubin alerts
- name: "" # [TODO]
description:
- name: 'elasticc.v0_9_1.brokerClassification'
description: 'Avro schema of alerts to be sent to DESC containing classifications of ELAsTiCC alerts.'
origin: 'https://github.com/LSSTDESC/elasticc/tree/main/alert_schema'
helper: '_local_schema_helper'
path: 'schemas/elasticc/elasticc.v0_9_1.brokerClassification.avsc'
schemaless_alert_bytes: true
#
# LSST alerts
# - name: 'lsst.v7_1.alert'
- name: 'lsst.v<MAJOR>_<MINOR>.alert'
description: '' # [TODO]
origin: 'https://github.com/lsst/alert_packet/tree/main/python/lsst/alert/packet/schema'
helper: '_lsst_schema_helper'
schemaless_alert_bytes: true
# [FIXME] filter_map is probably int -> {u, g, r, i, z, y}. Check for sure and fill in below
filter_map:
1: ''
2: ''
3: ''
4: ''
5: ''
6: ''
#
# ZTF alerts
- name: "ztf"
description: "ZTF schema. The ZTF survey publishes alerts in Avro format with the schema attached in the header. Pitt-Google publishes ZTF alerts in json format. This schema covers both cases."
- name: 'ztf'
description: 'ZTF schema. The ZTF survey publishes alerts in Avro format with the schema attached in the header. Pitt-Google publishes ZTF alerts in json format. This schema covers both cases.'
origin: 'https://zwickytransientfacility.github.io/ztf-avro-alert/schema.html'
helper: '_local_schema_helper'
schemaless_alert_bytes: false
path: null
filter_map:
1: g
2: r
3: i
7 changes: 4 additions & 3 deletions pittgoogle/schemas/maps/decat.yml
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
# We are no longer processing this survey. This file is left for reference.
SURVEY: decat
SURVEY_SCHEMA: https://github.com/rknop/decat_schema
SCHEMA_ORIGIN: https://github.com/rknop/decat_schema
TOPIC_SYNTAX: decat_yyyymmdd_2021A-0113 # replace yyyymmdd with the date
FILTER_MAP:
g DECam SDSS c0001 4720.0 1520.0: g
Expand All @@ -13,5 +14,5 @@ cutout_science: scicutout
cutout_template: refcutout
filter: filter
mag: mag
magerr: magerr
magzp: magzp
mag_err: magerr
mag_zp: magzp
10 changes: 4 additions & 6 deletions pittgoogle/schemas/maps/elasticc.yml
Original file line number Diff line number Diff line change
@@ -1,8 +1,6 @@
SURVEY: elasticc
SURVEY_SCHEMA: https://github.com/LSSTDESC/elasticc/tree/main/alert_schema
SCHEMA_ORIGIN: https://github.com/LSSTDESC/elasticc/tree/main/alert_schema
SCHEMA_VERSION: v0_9_1
TOPIC_SYNTAX:
FILTER_MAP:
alertid: alertId
objectid: [diaObject, diaObjectId]
source: diaSource
Expand All @@ -12,10 +10,10 @@ prv_forced_sources: prvDiaForcedSources
mjd: midPointTai
filter: filterName
mag: magpsf
magerr: sigmapsf
magzp: magzpsci
mag_err: sigmapsf
mag_zp: magzpsci
flux: psFlux
fluxerr: psFluxErr
flux_err: psFluxErr
ra: ra
dec: decl
cutout_science:
Expand Down
30 changes: 30 additions & 0 deletions pittgoogle/schemas/maps/lsst.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
SURVEY: lsst
SCHEMA_ORIGIN: 'https://github.com/lsst/alert_packet/tree/main/python/lsst/alert/packet/schema'
# [FIXME] Check everything below.
# IDs
alertid: alertId
sourceid: [diaSource, diaSourceId]
objectid: [diaObject, diaObjectId]
# Sources and Objects
source: diaSource
object: diaObject
prv_sources: prvDiaSources
prv_forced_sources: null
ss_object: ssObject
# Other
dec: [diaSource, dec]
dec_err: [diaSource, decErr]
filter: [diaSource, band]
flux: [diaSource, psfFlux]
flux_err: [diaSource, psfFluxErr]
mag: null
mag_err: null
mag_zp: null
mjd: [diaSource, midpointMjdTai]
ra: [diaSource, ra]
ra_err: [diaSource, raErr]
snr: [diaSource, snr]
# No cutouts in the alerts yet.
cutout_science: null
cutout_template: null
cutout_difference: null
6 changes: 3 additions & 3 deletions pittgoogle/schemas/maps/ztf.yml
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
SURVEY: ztf
SURVEY_SCHEMA: https://zwickytransientfacility.github.io/ztf-avro-alert/schema.html
SCHEMA_ORIGIN: https://zwickytransientfacility.github.io/ztf-avro-alert/schema.html
TOPIC_SYNTAX: ztf_yyyymmdd_programid1 # replace yyyymmdd with the date
FILTER_MAP:
1: g
Expand All @@ -14,5 +14,5 @@ cutout_science: cutoutScience
cutout_template: cutoutTemplate
filter: fid
mag: magpsf
magerr: sigmapsf
magzp: magzpsci
mag_err: sigmapsf
mag_zp: magzpsci
Loading

0 comments on commit b91c3c3

Please sign in to comment.