diff --git a/pittgoogle/registry.py b/pittgoogle/registry.py index 29cb75f..bcc0910 100644 --- a/pittgoogle/registry.py +++ b/pittgoogle/registry.py @@ -34,11 +34,37 @@ class ProjectIds: @define(frozen=True) class Schemas: - """Registry of schemas used by Pitt-Google.""" + """Registry of schemas used by Pitt-Google. + + Example: + + .. code-block:: python + + # View list of registered schema names. + pittgoogle.Schemas.names + + # Load a schema (choose a name from above and substitute it below). + schema = pittgoogle.Schemas.get(schema_name="ztf") + + + For Developers: + + Register a New Schema + + Schemas are defined in the yaml file [registry_manifests/schemas.yml](registry_manifests/schemas.yml). + To register a new schema, add a section to that file. + The fields are the same as those of a :class:`pittgoogle.types_.Schema`. + The `helper` field value must be the name of a valid `*_helper` method in :class:`pittgoogle.types_.Schema`. + If a suitable method does not already exist for your schema, add one by following the default as an example. + If your new helper method requires a new dependency, be sure to add it following + :doc:`/main/for-developers/manage-dependencies-poetry` + If you want to include your schema's ".avsc" file with the pittgoogle package, be sure to + commit the file(s) to the repo under the "schemas" directory. + """ @classmethod def get(cls, schema_name: str) -> types_.Schema: - """Return the registered schema called `schema_name`. + """Return the schema registered with name `schema_name`. Raises ------ @@ -48,18 +74,13 @@ def get(cls, schema_name: str) -> types_.Schema: for schema in SCHEMA_MANIFEST: if schema["name"] != schema_name: continue - - return types_.Schema( - name=schema["name"], - description=schema["description"], - path=PACKAGE_DIR / schema["path"] if schema["path"] is not None else None, - ) + return types_.Schema.from_yaml(schema_dict=schema) raise SchemaNotFoundError( f"{schema_name} not found. for a list of valid names, use `pittgoogle.Schemas.names()`." ) - @classmethod - def names(cls) -> list[str]: + @staticmethod + def names() -> list[str]: """Return the names of all registered schemas.""" return [schema["name"] for schema in SCHEMA_MANIFEST] diff --git a/pittgoogle/types_.py b/pittgoogle/types_.py index a7da725..88a15b9 100644 --- a/pittgoogle/types_.py +++ b/pittgoogle/types_.py @@ -2,7 +2,7 @@ """Classes defining new types.""" import importlib.resources import logging -from typing import TYPE_CHECKING, Optional +from typing import TYPE_CHECKING, Callable, Optional import fastavro import yaml @@ -20,15 +20,58 @@ class Schema: """Class for an individual schema. - This class is not intended to be used directly. - Use :class:`pittgoogle.Schemas` instead. + This class is not intended to be used directly. Use :class:`pittgoogle.Schemas` instead. """ + """The name of the schema.""" name: str = field() + """The description of the schema.""" description: str = field() + """Name of the `Schema` helper method used to load the schema.""" + _helper: str = field(default="_local_avsc_helper") + """Path where the helper can find the schema.""" path: Optional["Path"] = field(default=None) + """Mapping of Pitt-Google's generic field names to survey-specific field names.""" _map: Optional[dict] = field(default=None, init=False) - _avsc: Optional[dict] = field(default=None, init=False) + """The Avro schema loaded by the helper or None if no Avro schema exists.""" + avsc: Optional[dict] = field(default=None, init=False) + + @classmethod + def from_yaml(cls, schema_dict: dict) -> "Schema": + # initialize the class, then let the helper finish up + schema = cls(**schema_dict) + helper = getattr(cls, schema._helper) + schema = helper(schema) + return schema + + @staticmethod + def _local_avsc_helper(schema: "Schema") -> "Schema": + """Resolve `schema.path`. If it points to a valid ".avsc" file, load it into `schema.avsc`.""" + # Resolve the path. If it is not None, this helper expects it to be the path to + # a ".avsc" file relative to the pittgoogle package directory. + schema.path = PACKAGE_DIR / schema.path if schema.path is not None else None + + # Load the avro schema, if the file exists. + invalid_path = ( + (schema.path is None) or (schema.path.suffix != ".avsc") or (not schema.path.is_file()) + ) + if invalid_path: + schema.avsc = None + else: + schema.avsc = fastavro.schema.load_schema(schema.path) + + return schema + + @staticmethod + def _lsst_avsc_helper(schema: "Schema") -> "Schema": + """Resolve `schema.path`. If it points to a valid ".avsc" file, load the Avro schema.""" + import lsst.alert.packet.schema + + major, minor = schema.path.split(".") # [FIXME] + schema.path = lsst.alert.packet.schema.get_schema_path(major, minor) + schema.avsc = lsst.alert.packet.schema.Schema.from_file(schema.path) + + return schema @property def survey(self) -> str: @@ -51,32 +94,37 @@ def map(self) -> dict: raise ValueError(f"no schema map found for schema name '{self.name}'") return self._map - @property - def avsc(self) -> Optional[dict]: - """The Avro schema loaded from the file at `self.path`, or None if a valid file cannot be found.""" - # if the schema has already been loaded, return it - if self._avsc is not None: - return self._avsc + # @property + # def avsc(self) -> Optional[dict]: + # """The Avro schema loaded from the file at `self.path`, or None if a valid file cannot be found.""" + # # if the schema has already been loaded, return it + # if self._avsc is not None: + # return self._avsc - # if self.path does not point to an existing avro schema file, return None - if (self.path is None) or (self.path.suffix != ".avsc") or (not self.path.is_file()): - return None + # # if self.path does not point to an existing avro schema file, return None + # if (self.path is None) or (self.path.suffix != ".avsc") or (not self.path.is_file()): + # return None - # load the schema and return it - self._avsc = fastavro.schema.load_schema(self.path) - return self._avsc + # # load the schema and return it + # self._avsc = fastavro.schema.load_schema(self.path) + # return self._avsc @define(frozen=True) class PubsubMessageLike: """Container for an incoming Pub/Sub message that mimics a `google.cloud.pubsub_v1.types.PubsubMessage`. + This class is not intended to be used directly. + Use :class:`pittgoogle.Alert` instead. + + Purpose: It is convenient for the :class:`pittgoogle.Alert` class to work with a message as a - `pubsub_v1.types.PubsubMessage`. However, there are many ways to obtain an alert that do - not result in a `pubsub_v1.types.PubsubMessage` (e.g., an alert packet loaded from disk or - an incoming message to a Cloud Functions or Cloud Run module). In those cases, this class - is used to create an object with the same attributes as a `pubsub_v1.types.PubsubMessage`. - This object is then assigned to the `msg` attribute of the `Alert`. + `google.cloud.pubsub_v1.types.PubsubMessage`. However, there are many ways to obtain an alert + that do not result in a `google.cloud.pubsub_v1.types.PubsubMessage` (e.g., an alert packet + loaded from disk or an incoming message to a Cloud Functions or Cloud Run module). In those + cases, this class is used to create an object with the same attributes as a + `google.cloud.pubsub_v1.types.PubsubMessage`. This object is then assigned to the `msg` + attribute of the `Alert`. """ data: bytes = field() diff --git a/pyproject.toml b/pyproject.toml index 491d350..f9c76ea 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -37,6 +37,10 @@ google-cloud-bigquery = ">=3.11.2" google-cloud-pubsub = ">=2.17.1" pandas = ">=1.5" # v1.5.1 required by supernnova v3.0.1 tabulate = ">=0.9" +# lsst-alert-packet for the LSST schema helper. +# The PyPI version of this looks out of date so install from a git tag instead. +# [FIXME] This is pinned... is there a way to get the latest tag automatically? +lsst-alert-packet = { git = "https://github.com/lsst/alert_packet.git", tag = "w.2024.26" } [tool.poetry.group.docs] optional = true