Skip to content

Commit

Permalink
start rubin schema
Browse files Browse the repository at this point in the history
  • Loading branch information
troyraen committed Jun 29, 2024
1 parent 86440be commit 03d33db
Show file tree
Hide file tree
Showing 4 changed files with 106 additions and 32 deletions.
2 changes: 1 addition & 1 deletion pittgoogle/pubsub.py
Original file line number Diff line number Diff line change
Expand Up @@ -467,7 +467,7 @@ def purge(self):
msg = (
"WARNING: This is permanent.\n"
f"Are you sure you want to purge all messages from the subscription\n{self.path}?\n"
"(y/n): "
"(y/[n]): "
)
proceed = input(msg)
if proceed.lower() == "y":
Expand Down
42 changes: 32 additions & 10 deletions pittgoogle/registry.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,11 +34,38 @@ class ProjectIds:

@define(frozen=True)
class Schemas:
"""Registry of schemas used by Pitt-Google."""
"""Registry of schemas used by Pitt-Google.
Usage
-----
.. code-block:: python
# View list of registered schema names.
pittgoogle.Schemas.names
# Load a schema (choose a name from above and substitute it below).
schema = pittgoogle.Schemas.get(schema_name="ztf")
For Developers : Register a New Schema
--------------------------------------
Schemas are defined in the yaml file [registry_manifests/schemas.yml](registry_manifests/schemas.yml).
To register a new schema, add a section to that file.
The fields are the same as those of a :class:`pittgoogle.types_.Schema`.
The `helper` field value must be the name of a valid `*_helper` method in :class:`pittgoogle.types_.Schema`.
If a suitable method does not already exist for your schema, add one by following the default as an example.
If your new helper method requires a new dependency, be sure to add it following
# [FIXME]
[/docs/source/for-developers/manage-dependencies-poetry.md](/docs/source/for-developers/manage-dependencies-poetry.md).
If you want to include your schema's ".avsc" file with the pittgoogle package, be sure to
commit the file(s) to the repo under the "schemas" directory.
"""

@classmethod
def get(cls, schema_name: str) -> types_.Schema:
"""Return the registered schema called `schema_name`.
"""Return the schema registered with name `schema_name`.
Raises
------
Expand All @@ -48,18 +75,13 @@ def get(cls, schema_name: str) -> types_.Schema:
for schema in SCHEMA_MANIFEST:
if schema["name"] != schema_name:
continue

return types_.Schema(
name=schema["name"],
description=schema["description"],
path=PACKAGE_DIR / schema["path"] if schema["path"] is not None else None,
)
return types_.Schema.from_yaml(schema_dict=schema)

raise SchemaNotFoundError(
f"{schema_name} not found. for a list of valid names, use `pittgoogle.Schemas.names()`."
)

@classmethod
def names(cls) -> list[str]:
@staticmethod
def names() -> list[str]:
"""Return the names of all registered schemas."""
return [schema["name"] for schema in SCHEMA_MANIFEST]
90 changes: 69 additions & 21 deletions pittgoogle/types_.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
"""Functions to support working with alerts and related data."""
import importlib.resources
import logging
from typing import TYPE_CHECKING, Optional
from typing import TYPE_CHECKING, Callable, Optional

import fastavro
import yaml
Expand All @@ -20,15 +20,58 @@
class Schema:
"""Class for an individual schema.
This class is not intended to be used directly.
Use `pittgoogle.registry.Schemas` instead.
This class is not intended to be used directly. Use `pittgoogle.Schemas` instead.
"""

"""The name of the schema."""
name: str = field()
"""The description of the schema."""
description: str = field()
"""Name of the `Schema` helper method used to load the schema."""
_helper: str = field(default="_local_avsc_helper")
"""Path where the helper can find the schema."""
path: Optional["Path"] = field(default=None)
"""Mapping of Pitt-Google's generic field names to survey-specific field names."""
_map: Optional[dict] = field(default=None, init=False)
_avsc: Optional[dict] = field(default=None, init=False)
"""The Avro schema loaded by the helper or None if no Avro schema exists."""
avsc: Optional[dict] = field(default=None, init=False)

@classmethod
def from_yaml(cls, schema_dict: dict) -> "Schema":
# initialize the class, then let the helper finish up
schema = cls(**schema_dict)
helper = getattr(cls, schema._helper)
schema = helper(schema)
return schema

@staticmethod
def _local_avsc_helper(schema: "Schema") -> "Schema":
"""Resolve `schema.path`. If it points to a valid ".avsc" file, load it into `schema.avsc`."""
# Resolve the path. If it is not None, this helper expects it to be the path to
# a ".avsc" file relative to the pittgoogle package directory.
schema.path = PACKAGE_DIR / schema.path if schema.path is not None else None

# Load the avro schema, if the file exists.
invalid_path = (
(schema.path is None) or (schema.path.suffix != ".avsc") or (not schema.path.is_file())
)
if invalid_path:
schema.avsc = None
else:
schema.avsc = fastavro.schema.load_schema(schema.path)

return schema

@staticmethod
def _lsst_avsc_helper(schema: "Schema") -> "Schema":
"""Resolve `schema.path`. If it points to a valid ".avsc" file, load the Avro schema."""
import lsst.alert.packet.schema

major, minor = schema.path.split(".") # [FIXME]
schema.path = lsst.alert.packet.schema.get_schema_path(major, minor)
schema.avsc = lsst.alert.packet.schema.Schema.from_file(schema.path)

return schema

@property
def survey(self) -> str:
Expand All @@ -51,32 +94,37 @@ def map(self) -> dict:
raise ValueError(f"no schema map found for schema name '{self.name}'")
return self._map

@property
def avsc(self) -> Optional[dict]:
"""The Avro schema loaded from the file at `self.path`, or None if a valid file cannot be found."""
# if the schema has already been loaded, return it
if self._avsc is not None:
return self._avsc
# @property
# def avsc(self) -> Optional[dict]:
# """The Avro schema loaded from the file at `self.path`, or None if a valid file cannot be found."""
# # if the schema has already been loaded, return it
# if self._avsc is not None:
# return self._avsc

# if self.path does not point to an existing avro schema file, return None
if (self.path is None) or (self.path.suffix != ".avsc") or (not self.path.is_file()):
return None
# # if self.path does not point to an existing avro schema file, return None
# if (self.path is None) or (self.path.suffix != ".avsc") or (not self.path.is_file()):
# return None

# load the schema and return it
self._avsc = fastavro.schema.load_schema(self.path)
return self._avsc
# # load the schema and return it
# self._avsc = fastavro.schema.load_schema(self.path)
# return self._avsc


@define(frozen=True)
class PubsubMessageLike:
"""Container for an incoming Pub/Sub message that mimics a `google.cloud.pubsub_v1.types.PubsubMessage`.
This class is not intended to be used directly.
Use :class:`pittgoogle.Alert` instead.
Purpose:
It is convenient for the :class:`pittgoogle.Alert` class to work with a message as a
`pubsub_v1.types.PubsubMessage`. However, there are many ways to obtain an alert that do
not result in a `pubsub_v1.types.PubsubMessage` (e.g., an alert packet loaded from disk or
an incoming message to a Cloud Functions or Cloud Run module). In those cases, this class
is used to create an object with the same attributes as a `pubsub_v1.types.PubsubMessage`.
This object is then assigned to the `msg` attribute of the `Alert`.
`google.cloud.pubsub_v1.types.PubsubMessage`. However, there are many ways to obtain an alert
that do not result in a `google.cloud.pubsub_v1.types.PubsubMessage` (e.g., an alert packet
loaded from disk or an incoming message to a Cloud Functions or Cloud Run module). In those
cases, this class is used to create an object with the same attributes as a
`google.cloud.pubsub_v1.types.PubsubMessage`. This object is then assigned to the `msg`
attribute of the `Alert`.
"""

data: bytes = field()
Expand Down
4 changes: 4 additions & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,10 @@ google-cloud-bigquery = ">=3.11.2"
google-cloud-pubsub = ">=2.17.1"
pandas = ">=1.5" # v1.5.1 required by supernnova v3.0.1
tabulate = ">=0.9"
# lsst-alert-packet for the LSST schema helper.
# The PyPI version of this looks out of date so install from a git tag instead.
# [FIXME] This is pinned... is there a way to get the latest tag automatically?
lsst-alert-packet = { git = "https://github.com/lsst/alert_packet.git", tag = "w.2024.26" }

[tool.poetry.group.docs]
optional = true
Expand Down

0 comments on commit 03d33db

Please sign in to comment.