Skip to content

Commit

Permalink
Refactor Schema class, plus some other fix ups (#50)
Browse files Browse the repository at this point in the history
* move Schema class from types_.py -> schema.py

* add class SchemaHelpers

* change helper names
* _local_schema_helper -> default_schema_helper
* _lsst_schema_helper -> lsst_schema_helper

* docs: add schema module and some fixes

* rename SchemaNotFoundError -> SchemaError

* add custom Schema classes to de/serialize

* cleanup imports

* add a default schema

* remove OpenAlertError. use SchemaError instead

* hack in the schema for lsst.v7_1.alert

* remove lsst-alert-packet as a dependency

* fix CHANGELOG. prep release v0.3.7
  • Loading branch information
troyraen authored Jul 2, 2024
1 parent 3133b0e commit 9747522
Show file tree
Hide file tree
Showing 38 changed files with 5,953 additions and 663 deletions.
24 changes: 24 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,30 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/)

(none)

## \[v0.3.7\] - 2024-07-02

### Added

- A default schema to be used when no schema is provided.
- Child classes for `schema.Schema` that are specific to different serialization formats.

### Fixed

- Support for the latest LSST schema version (lsst.v7_1.alert). Note that this is the only LSST schema
version currently supported.

### Changed

- Renamed `exceptions.SchemaNotFoundError` -> `exceptions.SchemaError`, repurposed for more general use.
- Updates to documentation.

### Removed

- Removed `exceptions.OpenAlertError`. Use `exceptions.SchemaError` instead.
- Removed dependency on `lsst-alert-packet` package. We cannot install this from a git repo and also
publish our package to PyPI. Need to figure out how to fix this. Without it,
'schema.SchemaHelper.lsst_auto_schema_helper' will not work.

## \[v0.3.6\] - 2024-07-01

### Changed
Expand Down
5 changes: 2 additions & 3 deletions docs/source/api-reference/index.rst
Original file line number Diff line number Diff line change
@@ -1,15 +1,14 @@
pittgoogle
==========

.. These are from the ___init__.py file. Would be nice to find a way to pull them in automatically
.. and also to make them hyperlinks.
.. Listing the subset of class the user will interact with most.
.. Is this what we want the package index page to look like? # [TODO]
.. autosummary::

.. autosummary::

pittgoogle.alert.Alert
pittgoogle.auth.Auth
pittgoogle.bigquery.Table
pittgoogle.pubsub.Consumer
pittgoogle.pubsub.Subscription
Expand Down
5 changes: 5 additions & 0 deletions docs/source/api-reference/schema.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
pittgoogle.schema
=================

.. automodule:: pittgoogle.schema
:members:
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
# Add a schema map
Add a schema map
================

This page contains instructions for adding a new schema map.

Expand All @@ -7,18 +8,20 @@ Pitt-Google defines this set of generic field names so that we can write code th
different surveys without having to worry that one survey might call the (e.g.,) time field "MJD" while
another calls it "midPointTai". This map is what makes the :meth:`pittgoogle.alert.Alert.get` method work.

Currently, we define schema maps on a per-survey basis. :meth:`pittgoogle.types_.Schema.map` will load
the yaml file for the survey :meth:`pittgoogle.types_.Schema.survey` and return it as a dictionary.
Currently, we define schema maps on a per-survey basis. :meth:`pittgoogle.schema.Schema.map` will load
the yaml file for the survey :meth:`pittgoogle.schema.Schema.survey` and return it as a dictionary.
If you need something different, a more significant refactor will be required (left as an exercise
for the reader).

## Add a schema map for a new survey
Add a schema map for a new survey
---------------------------------

*pittgoogle/schemas/maps* is the directory containing the schema maps as yaml files.

To add a schema map, make a copy of the file *pittgoogle/schemas/maps/TEMPLATE.yml* and alter it.

### How to use the TEMPLATE.yml file
How to use the TEMPLATE.yml file
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^

*pittgoogle/schemas/maps/TEMPLATE.yml* : Make a copy of this file and name it using the syntax
*<survey_name>.yml*.
Expand All @@ -39,7 +42,8 @@ Alter the new file, keeping these important things to keep in mind:
to help guide your decisions.
- Try to make decisions that result in schema maps that are conceptually consistent across surveys.

### Excluding or adding field names (keys) from the set of Pitt-Google generics
Excluding or adding field names (keys) from the set of Pitt-Google generics
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^

Excluding fields : If your survey/schema does not need a particular key, that key does does not need to
be included in your new schema map's yaml file. There's not much code in ``pittgoogle-client`` itself
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
# Add a new schema to the registry
Add a new schema to the registry
================================

This page contains instructions for adding a new schema to the registry so that it can be loaded
using :meth:`pittgoogle.registry.Schemas.get` and used to serialize and deserialize the alert bytes.
using :meth:`pittgoogle.Schemas.get` and used to serialize and deserialize the alert bytes.

You will need to update at least the "Required" files listed below, and potentially one or more of the
others. The schema format is expected to be either Avro or Json.

Expand All @@ -10,18 +12,20 @@ First, a naming guideline:
- Schema names are expected to start with the name of the survey. If the survey has more than one schema,
the survey name should be followed by a "." and then schema-specific specifier(s).

## Required
Required
--------

### pittgoogle/registry_manifests/schemas.yml
pittgoogle/registry_manifests/schemas.yml
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^

*pittgoogle/registry_manifests/schemas.yml* is the manifest of registered schemas.

Add a new section to the manifest following the template provided there. The fields are the same as
those of a :class:`pittgoogle.types_.Schema`. The ``helper`` field must point to code that can find and load
those of a :class:`pittgoogle.schema.Schema`. The ``helper`` field must point to code that can find and load
the new schema definition; more information below.

Case 1: The schema definition is not *needed* in order to deserialize the alert bytes. This is true for
all Json and the Avro streams which attach the schema in the data header. Set
Case 1: The schema definition is not needed in order to deserialize the alert bytes. This is true for
all Json, and the Avro streams which attach the schema in the data header. Set
``schemaless_alert_bytes='false'``. Leave ``helper`` and ``path`` as defaults.

The rest of the cases assume the schema definition is required. This is true for "schemaless" Avro streams
Expand All @@ -40,23 +44,27 @@ usually only need to point to the main one. (3) If you've followed the recommend
``helper`` should work, but you should check (more below). If you need to implement your own helper
or update the existing, do it.

## Potentially Required
Potentially Required
--------------------

### pittgoogle/types_.py
pittgoogle/schema.py
^^^^^^^^^^^^^^^^^^^^

*pittgoogle/types_.py* is the file containing the :class:`pittgoogle.types_.Schema` class.
# [FIXME]
*pittgoogle/schema.py* is the file containing the :class:`pittgoogle.schema.Schema` class.

If ``schemaless_alert_bytes='false'``, the defaults (mostly null/None) should work and you can ignore
this file (skip to the next section).

A "helper" method must exist in :class:`pittgoogle.types_.Schema` that can find and load your new schema
A "helper" method must exist in :class:`pittgoogle.schema.Schema` that can find and load your new schema
definition. The ``helper`` field in the yaml manifest (above) must be set to the name of this method. If a
suitable helper method does not already already exist for your schema, add one to this file by following
existing helpers like :meth:`pittgoogle.types_.Schema._local_schema_helper` as examples. **If your helper
existing helpers like :meth:`pittgoogle.schema.Schema.default_schema_helper` as examples. **If your helper
method requires a new dependency, be sure to add it following
:doc:`/main/for-developers/manage-dependencies-poetry`.**

### pittgoogle/schemas/maps/
pittgoogle/schemas/maps/
^^^^^^^^^^^^^^^^^^^^^^^^

*pittgoogle/schemas/maps/* is the directory containing our schema maps.

Expand Down
48 changes: 48 additions & 0 deletions docs/source/for-developers/get-alerts-for-testing.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
Get alerts for testing
======================

Setup
-----

.. code-block:: python
import pittgoogle
Get alerts from a Pub/Sub subscription
--------------------------------------

If you need to create the subscription, follow the example in :class:`pittgoogle.Subscription`.

Here are examples that get an alert from each of our "loop" streams:

.. code-block:: python
# Choose one of the following
loop_sub = pittgoogle.Subscription("rubin-loop", schema_name="lsst.v7_1.alert")
loop_sub = pittgoogle.Subscription("elasticc-loop", schema_name="elasticc.v0_9_1.alert")
loop_sub = pittgoogle.Subscription("ztf-loop", schema_name="ztf")
loop_sub.touch()
alert = loop.pull_batch(max_messages=1)[0]
Get alerts from a file on disk
-------------------------------

.. code-block:: python
# [TODO] Add code snippet
Get alerts from Cloud Storage
-----------------------------

.. code-block:: python
# [TODO] Add code snippet
Get alerts from BigQuery
-------------------------
.. code-block:: python
# [TODO] Add code snippet
1 change: 1 addition & 0 deletions docs/source/for-developers/index.rst
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ For Developers
:maxdepth: 1

setup-environment
get-alerts-for-testing
add-new-schema
add-new-schema-map
manage-dependencies-poetry
Expand Down
2 changes: 1 addition & 1 deletion docs/source/for-developers/manage-dependencies-poetry.md
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# Managing dependencies with Poetry
# Manage dependencies with Poetry

This page contains instructions for managing the `pittgoogle` package dependencies using [Poetry](https://python-poetry.org/).
Poetry was implemented in this repo in [pull #7](https://github.com/mwvgroup/pittgoogle-client/pull/7).
Expand Down
1 change: 1 addition & 0 deletions docs/source/index.rst
Original file line number Diff line number Diff line change
Expand Up @@ -33,5 +33,6 @@ If you run into trouble, please
api-reference/exceptions
api-reference/pubsub
api-reference/registry
api-reference/schema
api-reference/types_
api-reference/utils
2 changes: 1 addition & 1 deletion pittgoogle/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
except ImportError: # for Python<3.8
import importlib_metadata as metadata

from . import alert, auth, bigquery, exceptions, pubsub, registry, types_, utils
from . import alert, auth, bigquery, exceptions, pubsub, registry, schema, types_, utils
from .alert import Alert
from .auth import Auth
from .bigquery import Table
Expand Down
74 changes: 25 additions & 49 deletions pittgoogle/alert.py
Original file line number Diff line number Diff line change
@@ -1,19 +1,17 @@
# -*- coding: UTF-8 -*-
"""Classes for working with astronomical alerts."""
import base64
import datetime
import importlib.resources
import io
import logging
from datetime import datetime
from pathlib import Path
from typing import TYPE_CHECKING, Any, Mapping, Union

import fastavro
import attrs
import google.cloud.pubsub_v1
from attrs import define, field

from . import registry, types_, utils
from .exceptions import BadRequest, OpenAlertError, SchemaNotFoundError
from . import registry, types_, exceptions
from .schema import Schema # so 'schema' module doesn't clobber 'Alert.schema' attribute

if TYPE_CHECKING:
import pandas as pd # always lazy-load pandas. it hogs memory on cloud functions and run
Expand All @@ -22,7 +20,7 @@
PACKAGE_DIR = importlib.resources.files(__package__)


@define(kw_only=True)
@attrs.define(kw_only=True)
class Alert:
"""Container for an astronomical alert.
Expand All @@ -47,16 +45,16 @@ class Alert:
----
"""

_dict: Mapping | None = field(default=None)
_attributes: Mapping[str, str] | None = field(default=None)
schema_name: str | None = field(default=None)
msg: google.cloud.pubsub_v1.types.PubsubMessage | types_.PubsubMessageLike | None = field(
default=None
_dict: Mapping | None = attrs.field(default=None)
_attributes: Mapping[str, str] | None = attrs.field(default=None)
schema_name: str | None = attrs.field(default=None)
msg: google.cloud.pubsub_v1.types.PubsubMessage | types_.PubsubMessageLike | None = (
attrs.field(default=None)
)
path: Path | None = field(default=None)
path: Path | None = attrs.field(default=None)
# Use "Union" because " | " is throwing an error when combined with forward references.
_dataframe: Union["pd.DataFrame", None] = field(default=None)
_schema: types_.Schema | None = field(default=None, init=False)
_dataframe: Union["pd.DataFrame", None] = attrs.field(default=None)
_schema: Schema | None = attrs.field(default=None, init=False)

# ---- class methods ---- #
@classmethod
Expand Down Expand Up @@ -109,17 +107,17 @@ def index():
"""
# check whether received message is valid, as suggested by Cloud Run docs
if not envelope:
raise BadRequest("Bad Request: no Pub/Sub message received")
raise exceptions.BadRequest("Bad Request: no Pub/Sub message received")
if not isinstance(envelope, dict) or "message" not in envelope:
raise BadRequest("Bad Request: invalid Pub/Sub message format")
raise exceptions.BadRequest("Bad Request: invalid Pub/Sub message format")

# convert the message publish_time string -> datetime
# occasionally the string doesn't include microseconds so we need a try/except
publish_time = envelope["message"]["publish_time"].replace("Z", "+00:00")
try:
publish_time = datetime.strptime(publish_time, "%Y-%m-%dT%H:%M:%S.%f%z")
publish_time = datetime.datetime.strptime(publish_time, "%Y-%m-%dT%H:%M:%S.%f%z")
except ValueError:
publish_time = datetime.strptime(publish_time, "%Y-%m-%dT%H:%M:%S%z")
publish_time = datetime.datetime.strptime(publish_time, "%Y-%m-%dT%H:%M:%S%z")

return cls(
msg=types_.PubsubMessageLike(
Expand Down Expand Up @@ -232,26 +230,11 @@ def dict(self) -> Mapping:
The alert data as a dictionary.
Raises:
OpenAlertError:
SchemaError:
If unable to deserialize the alert bytes.
"""
if self._dict is not None:
return self._dict

if self.schema.schemaless_alert_bytes:
bytes_io = io.BytesIO(self.msg.data)
self._dict = fastavro.schemaless_reader(bytes_io, self.schema.definition)
return self._dict

# [TODO] this should be rewritten to catch specific errors
# for now, just try avro then json, catching basically all errors in the process
try:
self._dict = utils.Cast.avro_to_dict(self.msg.data)
except Exception:
try:
self._dict = utils.Cast.json_to_dict(self.msg.data)
except Exception:
raise OpenAlertError("failed to deserialize the alert bytes")
if self._dict is None:
self._dict = self.schema.deserialize(self.msg.data)
return self._dict

@property
Expand Down Expand Up @@ -306,22 +289,15 @@ def sourceid(self) -> str | int:
return self.get("sourceid")

@property
def schema(self) -> types_.Schema:
"""Return the schema from the :class:`pittgoogle.Schemas` registry.
def schema(self) -> Schema:
"""Return the schema from the :class:`pittgoogle.registry.Schemas` registry.
Raises:
pittgoogle.exceptions.SchemaNotFoundError:
SchemaError:
If the `schema_name` is not supplied or a schema with this name is not found.
"""
if self._schema is not None:
return self._schema

# need to load the schema. raise an error if no schema_name given
if self.schema_name is None:
raise SchemaNotFoundError("a schema_name is required")

# this also may raise SchemaNotFoundError
self._schema = registry.Schemas.get(self.schema_name)
if self._schema is None:
self._schema = registry.Schemas.get(self.schema_name)
return self._schema

# ---- methods ---- #
Expand Down
6 changes: 1 addition & 5 deletions pittgoogle/exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,5 @@ class CloudConnectionError(Exception):
"""Raised when a problem is encountered while trying to a Google Cloud resource."""


class OpenAlertError(Exception):
"""Raised when unable to deserialize a Pub/Sub message payload."""


class SchemaNotFoundError(Exception):
class SchemaError(Exception):
"""Raised when a schema with a given name is not found in the registry."""
Loading

0 comments on commit 9747522

Please sign in to comment.