diff --git a/CHANGELOG.md b/CHANGELOG.md index 613266d..84c5c6e 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -10,7 +10,13 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/) ## \[Unreleased\] -(none) + +### Changed + +- Make `Alert` method private, `add_id_attributes` -> `_add_id_attributes`. +- Update docstrings for clarity and accuracy. +- Improve type hints. +- Fix up Sphinx and rst to improve how docs are being rendered. ## \[v0.3.4\] - 2024-06-29 diff --git a/docs/source/api-reference/alert.rst b/docs/source/api-reference/alert.rst index 7939c8d..1ae7a78 100644 --- a/docs/source/api-reference/alert.rst +++ b/docs/source/api-reference/alert.rst @@ -3,5 +3,3 @@ pittgoogle.alert .. automodule:: pittgoogle.alert :members: - :private-members: - :member-order: bysource diff --git a/docs/source/api-reference/auth.rst b/docs/source/api-reference/auth.rst index 801208b..5d59a21 100644 --- a/docs/source/api-reference/auth.rst +++ b/docs/source/api-reference/auth.rst @@ -3,5 +3,3 @@ pittgoogle.auth .. automodule:: pittgoogle.auth :members: - :private-members: - :member-order: bysource diff --git a/docs/source/api-reference/exceptions.rst b/docs/source/api-reference/exceptions.rst index 00bfd19..237650f 100644 --- a/docs/source/api-reference/exceptions.rst +++ b/docs/source/api-reference/exceptions.rst @@ -3,5 +3,3 @@ pittgoogle.exceptions .. automodule:: pittgoogle.exceptions :members: - :private-members: - :member-order: bysource diff --git a/docs/source/api-reference/index.rst b/docs/source/api-reference/index.rst index ce3c81b..372ecaf 100644 --- a/docs/source/api-reference/index.rst +++ b/docs/source/api-reference/index.rst @@ -6,11 +6,13 @@ pittgoogle .. autosummary:: - pittgoogle.Alert - pittgoogle.Auth - pittgoogle.Consumer - pittgoogle.ProjectIds - pittgoogle.Schemas - pittgoogle.Subscription - pittgoogle.Table - pittgoogle.Topic + .. autosummary:: + + pittgoogle.alert.Alert + pittgoogle.auth.Auth + pittgoogle.bigquery.Table + pittgoogle.pubsub.Consumer + pittgoogle.pubsub.Subscription + pittgoogle.pubsub.Topic + pittgoogle.registry.ProjectIds + pittgoogle.registry.Schemas diff --git a/docs/source/api-reference/registry.rst b/docs/source/api-reference/registry.rst index 91c0572..57d302e 100644 --- a/docs/source/api-reference/registry.rst +++ b/docs/source/api-reference/registry.rst @@ -3,5 +3,3 @@ pittgoogle.registry .. automodule:: pittgoogle.registry :members: - :private-members: - :member-order: bysource diff --git a/docs/source/api-reference/types_.rst b/docs/source/api-reference/types_.rst index 32a97c1..96c8a36 100644 --- a/docs/source/api-reference/types_.rst +++ b/docs/source/api-reference/types_.rst @@ -3,5 +3,3 @@ pittgoogle.types_ .. automodule:: pittgoogle.types_ :members: - :private-members: - :member-order: bysource diff --git a/docs/source/api-reference/utils.rst b/docs/source/api-reference/utils.rst index a30f78e..4f37feb 100644 --- a/docs/source/api-reference/utils.rst +++ b/docs/source/api-reference/utils.rst @@ -3,4 +3,3 @@ pittgoogle.utils .. automodule:: pittgoogle.utils :members: - :member-order: bysource diff --git a/docs/source/index.rst b/docs/source/index.rst index 6354e66..7894714 100644 --- a/docs/source/index.rst +++ b/docs/source/index.rst @@ -18,7 +18,6 @@ If you run into trouble, please :maxdepth: 1 listings - Install one-time-setup/index faq/index for-developers/index diff --git a/docs/source/one-time-setup/install.rst b/docs/source/one-time-setup/install.rst index 2e512ec..863406e 100644 --- a/docs/source/one-time-setup/install.rst +++ b/docs/source/one-time-setup/install.rst @@ -1,7 +1,7 @@ .. _install: Install pittgoogle-client -------------------------- +========================= .. automodule:: pittgoogle diff --git a/pittgoogle/alert.py b/pittgoogle/alert.py index 68dcfb1..0b6e84d 100644 --- a/pittgoogle/alert.py +++ b/pittgoogle/alert.py @@ -9,13 +9,13 @@ from typing import TYPE_CHECKING, Any, Mapping, Union import fastavro +import google.cloud.pubsub_v1 from attrs import define, field from . import registry, types_, utils from .exceptions import BadRequest, OpenAlertError, SchemaNotFoundError if TYPE_CHECKING: - import google.cloud.pubsub_v1 import pandas as pd # always lazy-load pandas. it hogs memory on cloud functions and run LOGGER = logging.getLogger(__name__) @@ -26,39 +26,37 @@ class Alert: """Container for an astronomical alert. - Instances of this class are returned by other calls like :meth:`pittgoogle.Subscription.pull_batch`, - so it is often not necessary to instantiate this directly. - In cases where you do want to create an `Alert` directly, use one of the `from_*` methods like - :meth:`pittgoogle.Alert.from_dict`. - - All parameters are keyword only. + To create an `Alert`, use one of the `from_*` methods like :meth:`pittgoogle.Alert.from_dict`. + Instances of this class are also returned by other calls like :meth:`pittgoogle.pubsub.Subscription.pull_batch`. Args: - bytes (bytes, optional): - The message payload, as returned by Pub/Sub. It may be Avro or JSON serialized depending - on the topic. dict (dict, optional): - The message payload as a dictionary. - metadata (dict, optional): - The message metadata. - msg (google.cloud.pubsub_v1.types.PubsubMessage, optional): - The Pub/Sub message object, documented at - ``__. + The alert data as a dictionary. If not provided, it will be loaded from the + attributes (dict, optional): + Attributes or custom metadata for the alert. schema_name (str): - Schema name of the alert. Used for unpacking. If not provided, some properties of the - `Alert` may not be available. See :meth:`pittgoogle.Schemas.names` for a list of options. + Name of the schema for the alert. This is use to deserialize the alert bytes. + See :meth:`pittgoogle.registry.Schemas.names` for a list of options. + If not provided, some properties of the `Alert` may not be available. + msg (PubsubMessageLike or google.cloud.pubsub_v1.types.PubsubMessage, optional): + The incoming Pub/Sub message object. This class is documented at + ``__. + path (pathlib.Path, optional): + Path to a file containing the alert data. + + ---- """ - # Use "Union" because " | " is throwing an error when combined with forward references. - msg: Union["google.cloud.pubsub_v1.types.PubsubMessage", types_.PubsubMessageLike, None] = ( - field(default=None) - ) - _attributes: Mapping[str, str] | None = field(default=None) _dict: Mapping | None = field(default=None) - _dataframe: Union["pd.DataFrame", None] = field(default=None) + _attributes: Mapping[str, str] | None = field(default=None) schema_name: str | None = field(default=None) - _schema: types_.Schema | None = field(default=None, init=False) + msg: google.cloud.pubsub_v1.types.PubsubMessage | types_.PubsubMessageLike | None = field( + default=None + ) path: Path | None = field(default=None) + # Use "Union" because " | " is throwing an error when combined with forward references. + _dataframe: Union["pd.DataFrame", None] = field(default=None) + _schema: types_.Schema | None = field(default=None, init=False) # ---- class methods ---- # @classmethod @@ -80,33 +78,34 @@ def from_cloud_run(cls, envelope: Mapping, schema_name: str | None = None) -> "A If the Pub/Sub message is invalid or missing. Example: + Code for a Cloud Run module that uses this method to open a ZTF alert: - .. code-block:: python + .. code-block:: python - import pittgoogle - # flask is used to work with HTTP requests, which trigger Cloud Run modules - # the request contains the Pub/Sub message, which contains the alert packet - import flask + import pittgoogle + # flask is used to work with HTTP requests, which trigger Cloud Run modules + # the request contains the Pub/Sub message, which contains the alert packet + import flask - app = flask.Flask(__name__) + app = flask.Flask(__name__) - # function that receives the request - @app.route("/", methods=["POST"]) - def index(): + # function that receives the request + @app.route("/", methods=["POST"]) + def index(): - try: - # unpack the alert - # if the request does not contain a valid message, this raises a `BadRequest` - alert = pittgoogle.Alert.from_cloud_run(envelope=flask.request.get_json(), schema_name="ztf") + try: + # unpack the alert + # if the request does not contain a valid message, this raises a `BadRequest` + alert = pittgoogle.Alert.from_cloud_run(envelope=flask.request.get_json(), schema_name="ztf") - except pittgoogle.exceptions.BadRequest as exc: - # return the error text and an HTTP 400 Bad Request code - return str(exc), 400 + except pittgoogle.exceptions.BadRequest as exc: + # return the error text and an HTTP 400 Bad Request code + return str(exc), 400 - # continue processing the alert - # when finished, return an empty string and an HTTP success code - return "", 204 + # continue processing the alert + # when finished, return an empty string and an HTTP success code + return "", 204 """ # check whether received message is valid, as suggested by Cloud Run docs if not envelope: @@ -219,22 +218,15 @@ def attributes(self) -> Mapping: @property def dict(self) -> Mapping: - """Return the alert data as a dictionary. + """Alert data as a dictionary. If this was not provided (typical case), this attribute will contain the deserialized - alert bytes stored in the incoming :attr:`Alert.msg.data` as a dictionary. + alert bytes from :attr:`Alert.msg.data`. You may update this dictionary as desired. If you publish this alert using :attr:`pittgoogle.Topic.publish`, this dictionary will be sent as the outgoing Pub/Sub message's data payload. - Note: The following is required in order to deserialize the incoming alert bytes. - The bytes can be in either Avro or JSON format, depending on the topic. - If the alert bytes are Avro and contain the schema in the header, the deserialization can - be done without requiring :attr:`Alert.schema`. However, if the alert bytes are - schemaless Avro, the deserialization requires the :attr:`Alert.schema.avsc` attribute to - contain the schema definition. - Returns: dict: The alert data as a dictionary. @@ -355,7 +347,7 @@ def schema(self) -> types_.Schema: return self._schema # ---- methods ---- # - def add_id_attributes(self) -> None: + def _add_id_attributes(self) -> None: """Add the IDs ("alertid", "objectid", "sourceid") to :attr:`Alert.attributes`.""" ids = ["alertid", "objectid", "sourceid"] values = [self.get(id) for id in ids] diff --git a/pittgoogle/auth.py b/pittgoogle/auth.py index 6869671..9d00b15 100644 --- a/pittgoogle/auth.py +++ b/pittgoogle/auth.py @@ -10,82 +10,88 @@ """ import logging import os -from typing import TYPE_CHECKING, Union +import attrs import google.auth +import google.auth.credentials +import google.oauth2.credentials import google_auth_oauthlib.helpers -from attrs import define, field from requests_oauthlib import OAuth2Session -if TYPE_CHECKING: - import google.auth.credentials - import google.oauth2.credentials - - LOGGER = logging.getLogger(__name__) -@define +@attrs.define class Auth: """Credentials for authenticating with a Google Cloud project. - This class provides methods to obtain and load credentials from either a service account - key file or an OAuth2 session. - To authenticate, you must have completed one of the setup options described in - :doc:`/one-time-setup/authentication`. + This class provides methods to load credentials from either a service account key file or an + OAuth2 session. To authenticate, you must have completed one of the setup options described + in :doc:`/one-time-setup/authentication`. - Attributes - ---------- - GOOGLE_CLOUD_PROJECT : str - The project ID of the Google Cloud project to connect to. This can be set as an - environment variable. + In typical use cases, the following arguments are set as environment variables instead of + being passed to `Auth` explicitly. - GOOGLE_APPLICATION_CREDENTIALS : str - The path to a keyfile containing service account credentials. Either this or the - `OAUTH_CLIENT_*` settings are required for successful authentication. + Args: + GOOGLE_CLOUD_PROJECT (str, optional): + The project ID of the Google Cloud project to connect to. - OAUTH_CLIENT_ID : str - The client ID for an OAuth2 connection. Either this and `OAUTH_CLIENT_SECRET`, or - the `GOOGLE_APPLICATION_CREDENTIALS` setting, are required for successful - authentication. + GOOGLE_APPLICATION_CREDENTIALS (str, optional): + The path to a keyfile containing service account credentials. Either this or the + `OAUTH_CLIENT_*` settings are required for successful authentication. - OAUTH_CLIENT_SECRET : str - The client secret for an OAuth2 connection. Either this and `OAUTH_CLIENT_ID`, or - the `GOOGLE_APPLICATION_CREDENTIALS` setting, are required for successful - authentication. + OAUTH_CLIENT_ID (str, optional): + The client ID for an OAuth2 connection. Either this and `OAUTH_CLIENT_SECRET`, or + the `GOOGLE_APPLICATION_CREDENTIALS` setting, are required for successful + authentication. + + OAUTH_CLIENT_SECRET (str, optional): + The client secret for an OAuth2 connection. Either this and `OAUTH_CLIENT_ID`, or + the `GOOGLE_APPLICATION_CREDENTIALS` setting, are required for successful + authentication. Example: - The basic call is: + The basic call is: - .. code-block:: python + .. code-block:: python - myauth = pittgoogle.Auth() + myauth = pittgoogle.Auth() - This will load authentication settings from your :ref:`environment variables `. - You can override this behavior with keyword arguments. This does not automatically load the - credentials. To do that, request them explicitly: + This will load authentication settings from your :ref:`environment variables `. + You can override this behavior with keyword arguments. This does not automatically load the + credentials. To do that, request them explicitly: - .. code-block:: python + .. code-block:: python - myauth.credentials + myauth.credentials - It will first look for a service account key file, then fallback to OAuth2. + It will first look for a service account key file, then fallback to OAuth2. """ - GOOGLE_CLOUD_PROJECT = field(factory=lambda: os.getenv("GOOGLE_CLOUD_PROJECT", None)) - GOOGLE_APPLICATION_CREDENTIALS = field( + # Strings _below_ the field will make these also show up as individual properties in rendered docs. + GOOGLE_CLOUD_PROJECT: str | None = attrs.field( + factory=lambda: os.getenv("GOOGLE_CLOUD_PROJECT", None) + ) + """The project ID of the Google Cloud project to connect to.""" + GOOGLE_APPLICATION_CREDENTIALS: str | None = attrs.field( factory=lambda: os.getenv("GOOGLE_APPLICATION_CREDENTIALS", None) ) - OAUTH_CLIENT_ID = field(factory=lambda: os.getenv("OAUTH_CLIENT_ID", None)) - OAUTH_CLIENT_SECRET = field(factory=lambda: os.getenv("OAUTH_CLIENT_SECRET", None)) - _credentials = field(default=None, init=False) - _oauth2 = field(default=None, init=False) + """The path to a keyfile containing service account credentials.""" + OAUTH_CLIENT_ID: str | None = attrs.field(factory=lambda: os.getenv("OAUTH_CLIENT_ID", None)) + """The client ID for an OAuth2 connection.""" + OAUTH_CLIENT_SECRET: str | None = attrs.field( + factory=lambda: os.getenv("OAUTH_CLIENT_SECRET", None) + ) + """The client secret for an OAuth2 connection.""" + # The rest don't need string descriptions because they are explicitly defined as properties below. + _credentials = attrs.field(default=None, init=False) + _oauth2 = attrs.field(default=None, init=False) @property def credentials( self, - ) -> Union["google.auth.credentials.Credentials", "google.oauth2.credentials.Credentials"]: + ) -> google.auth.credentials.Credentials | google.oauth2.credentials.Credentials: """Credentials, loaded from a service account key file or an OAuth2 session.""" if self._credentials is None: self._credentials = self._get_credentials() @@ -93,7 +99,7 @@ def credentials( def _get_credentials( self, - ) -> Union["google.auth.credentials.Credentials", "google.oauth2.credentials.Credentials"]: + ) -> google.auth.credentials.Credentials | google.oauth2.credentials.Credentials: """Load user credentials from a service account key file or an OAuth2 session. Try the service account first, fall back to OAuth2. diff --git a/pittgoogle/bigquery.py b/pittgoogle/bigquery.py index 6d46d6d..3cbe921 100644 --- a/pittgoogle/bigquery.py +++ b/pittgoogle/bigquery.py @@ -1,18 +1,9 @@ # -*- coding: UTF-8 -*- -"""Classes to facilitate connections to BigQuery datasets and tables. - -.. note:: - - This module relies on :mod:`pittgoogle.auth` to authenticate API calls. - The examples given below assume the use of a :ref:`service account ` and - :ref:`environment variables `. -""" +"""Classes to facilitate connections to BigQuery datasets and tables.""" import logging -from typing import Optional, Union -import google.cloud.bigquery as bigquery -from attrs import define, field -from attrs.validators import instance_of, optional +import attrs +import google.cloud.bigquery from .alert import Alert from .auth import Auth @@ -20,77 +11,94 @@ LOGGER = logging.getLogger(__name__) -@define +@attrs.define class Table: """Methods and properties for a BigQuery table. - Parameters - ------------ - name : `str` - Name of the BigQuery table. - dataset : `str` - Name of the BigQuery dataset this table belongs to. - - projectid : `str`, optional - The table owner's Google Cloud project ID. Either this or `auth` is required. Note: - :attr:`pittgoogle.utils.ProjectIds` is a registry containing Pitt-Google's project IDs. - auth : :class:`pittgoogle.auth.Auth`, optional - Credentials for the Google Cloud project that owns this table. If not provided, - it will be created from environment variables when needed. - client : `bigquery.Client`, optional - BigQuery client that will be used to access the table. If not provided, a new client will - be created (using `auth`) the first time it is requested. + Args: + name (str): + Name of the BigQuery table. + dataset (str): + Name of the BigQuery dataset this table belongs to. + projectid (str, optional): + The table owner's Google Cloud project ID. Either this or `auth` is required. Note: + :attr:`pittgoogle.utils.ProjectIds` is a registry containing Pitt-Google's project IDs. + auth (Auth, optional): + Credentials for the Google Cloud project that owns this table. + If not provided, it will be created from environment variables when needed. + client (google.cloud.bigquery.Client, optional): + BigQuery client that will be used to access the table. + If not provided, a new client will be created the first time it is requested. + + ---- """ - name: str = field() - dataset: str = field() - _projectid: str = field(default=None) - _auth: Auth = field(default=None, validator=optional(instance_of(Auth))) - _client: Optional[bigquery.Client] = field( - default=None, validator=optional(instance_of(bigquery.Client)) + # Strings _below_ the field will make these also show up as individual properties in rendered docs. + name: str = attrs.field() + """Name of the BigQuery table.""" + dataset: str = attrs.field() + """Name of the BigQuery dataset this table belongs to.""" + # The rest don't need string descriptions because they are explicitly defined as properties below. + _projectid: str = attrs.field(default=None) + _auth: Auth = attrs.field( + default=None, validator=attrs.validators.optional(attrs.validators.instance_of(Auth)) ) - _table: Optional[bigquery.Table] = field(default=None, init=False) + _client: google.cloud.bigquery.Client | None = attrs.field( + default=None, + validator=attrs.validators.optional( + attrs.validators.instance_of(google.cloud.bigquery.Client) + ), + ) + _table: google.cloud.bigquery.Table | None = attrs.field(default=None, init=False) @classmethod def from_cloud( cls, name: str, *, - dataset: Optional[str] = None, - survey: Optional[str] = None, - testid: Optional[str] = None, + dataset: str | None = None, + survey: str | None = None, + testid: str | None = None, ): """Create a `Table` object using a `client` with implicit credentials. - Useful when creating a `Table` object from within a Cloud Run module or similar. - The table in Google BigQuery is expected to exist already. - The `projectid` will be retrieved from the `client`. - - Parameters - ---------- - name : `str` - Name of the table. - dataset : `str`, optional - Name of the dataset containing the table. Either this or a `survey` is required. If a - `testid` is provided, it will be appended to this name following the Pitt-Google naming syntax. - survey : `str`, optional - Name of the survey. This will be used as the name of the dataset if the `dataset` kwarg - is not provided. This kwarg is provided for convenience in cases where the Pitt-Google - naming syntax is used to name resources. - testid : `str`, optional - Pipeline identifier. If this is not `None`, `False`, or `"False"` it will be appended to - the dataset name. This is used in cases where the Pitt-Google naming syntax is used to name - resources. This allows pipeline modules to find the correct resources without interfering - with other pipelines that may have deployed resources with the same base names - (e.g., for development and testing purposes). + Use this method when creating a `Table` object in code running in Google Cloud (e.g., + in a Cloud Run module). The underlying Google APIs will automatically find your credentials. + + The table resource in Google BigQuery is expected to already exist. + + Args: + name (str): + Name of the table. + dataset (str, optional): + Name of the dataset containing the table. Either this or a `survey` is required. + If a `testid` is provided, it will be appended to this name following the Pitt-Google + naming syntax. + survey (str, optional): + Name of the survey. This will be used as the name of the dataset if the `dataset` + kwarg is not provided. This kwarg is provided for convenience in cases where the + Pitt-Google naming syntax is used to name resources. + testid (str, optional): + Pipeline identifier. If this is not `None`, `False`, or `"False"`, it will be + appended to the dataset name. This is used in cases where the Pitt-Google naming + syntax is used to name resources. This allows pipeline modules to find the correct + resources without interfering with other pipelines that may have deployed resources + with the same base names (e.g., for development and testing purposes). + + Returns: + Table: + The `Table` object. + + Raises: + NotFound: + # [TODO] Track down specific error raised when table doesn't exist; update this docstring. """ if dataset is None: - # [TODO] update the elasticc broker to name the dataset using the survey name only dataset = survey # if testid is not False, "False", or None, append it to the dataset if testid and testid != "False": dataset = f"{dataset}_{testid}" - client = bigquery.Client() + client = google.cloud.bigquery.Client() table = cls(name, dataset=dataset, projectid=client.project, client=client) # make the get request now to create a connection to the table _ = table.table @@ -100,7 +108,7 @@ def from_cloud( def auth(self) -> Auth: """Credentials for the Google Cloud project that owns this table. - This will be created from environment variables if `self._auth` is None. + This will be created using environment variables if necessary. """ if self._auth is None: self._auth = Auth() @@ -113,7 +121,7 @@ def auth(self) -> Auth: @property def id(self) -> str: - """Fully qualified table ID with syntax "projectid.dataset_name.table_name".""" + """Fully qualified table ID with syntax 'projectid.dataset_name.table_name'.""" return f"{self.projectid}.{self.dataset}.{self.name}" @property @@ -124,34 +132,43 @@ def projectid(self) -> str: return self._projectid @property - def table(self) -> bigquery.Table: - """Return a BigQuery Table object that's connected to the table. Makes a get request if necessary.""" + def table(self) -> google.cloud.bigquery.Table: + """Google Cloud BigQuery Table object that is connected to the Cloud resource. + + Makes a `get_table` request if necessary. + + Returns: + google.cloud.bigquery.Table: + The BigQuery Table object, connected to the Cloud resource. + """ if self._table is None: self._table = self.client.get_table(self.id) return self._table @property - def client(self) -> bigquery.Client: - """BigQuery client for table access. + def client(self) -> google.cloud.bigquery.Client: + """Google Cloud BigQuery Client used to access the table. + + This will be created using :attr:`Table.auth` if necessary. - Will be created using `self.auth.credentials` if necessary. + Returns: + google.cloud.bigquery.Client: + The BigQuery client instance. """ if self._client is None: - self._client = bigquery.Client(credentials=self.auth.credentials) + self._client = google.cloud.bigquery.Client(credentials=self.auth.credentials) return self._client - def insert_rows(self, rows: Union[list[dict], list[Alert]]) -> list[dict]: - """Inserts rows into the BigQuery table. + def insert_rows(self, rows: list[dict | Alert]) -> list[dict]: + """Inserts the rows into the BigQuery table. - Parameters - ---------- - rows : list[dict] or list[Alert] - The rows to be inserted. Can be a list of dictionaries or a list of Alert objects. + Args: + rows (list[dict or Alert]): + The rows to be inserted. Can be a list of dictionaries or a list of Alert objects. - Returns - ------- - list[dict] - A list of errors encountered. + Returns: + list[dict]: + A list of errors encountered. """ # if elements of rows are Alerts, need to extract the dicts myrows = [row.dict if isinstance(row, Alert) else row for row in rows] diff --git a/pittgoogle/pubsub.py b/pittgoogle/pubsub.py index ade42fb..2bc9d53 100644 --- a/pittgoogle/pubsub.py +++ b/pittgoogle/pubsub.py @@ -282,26 +282,26 @@ class Subscription: Example: - Create a subscription to the "ztf-loop" topic: + Create a subscription to the "ztf-loop" topic: - .. code-block:: python + .. code-block:: python - # topic the subscription will be connected to - # only required if the subscription does not yet exist in Google Cloud - topic = pittgoogle.Topic(name="ztf-loop", projectid=pittgoogle.ProjectIds.pittgoogle) + # topic the subscription will be connected to + # only required if the subscription does not yet exist in Google Cloud + topic = pittgoogle.Topic(name="ztf-loop", projectid=pittgoogle.ProjectIds.pittgoogle) - # choose your own name for the subscription - subscription = pittgoogle.Subscription(name="my-ztf-loop-subscription", topic=topic, schema_name="ztf") + # choose your own name for the subscription + subscription = pittgoogle.Subscription(name="my-ztf-loop-subscription", topic=topic, schema_name="ztf") - # make sure the subscription exists and we can connect to it. create it if necessary - subscription.touch() + # make sure the subscription exists and we can connect to it. create it if necessary + subscription.touch() - Pull a small batch of alerts. Helpful for testing. (For long-runnining listeners, see - :class:`pittgoogle.Consumer`.) + Pull a small batch of alerts. Helpful for testing. (For long-runnining listeners, see + :class:`pittgoogle.Consumer`.) - .. code-block:: python + .. code-block:: python - alerts = subscription.pull_batch(subscription, max_messages=4) + alerts = subscription.pull_batch(subscription, max_messages=4) """ name: str = field() @@ -452,36 +452,35 @@ class Consumer: Example: - Open a streaming pull. Recommended for long-running listeners. This will pull and process - messages in the background, indefinitely. User must supply a callback that processes a single message. - It should accept a :class:`pittgoogle.pubsub.Alert` and return a :class:`pittgoogle.pubsub.Response`. - Optionally, can provide a callback that processes a batch of messages. Note that messages are - acknowledged (and thus permanently deleted) _before_ the batch callback runs, so it is recommended - to do as much processing as possible in the message callback and use a batch callback only when - necessary. + Open a streaming pull. Recommended for long-running listeners. This will pull and process + messages in the background, indefinitely. User must supply a callback that processes a single message. + It should accept a :class:`pittgoogle.pubsub.Alert` and return a :class:`pittgoogle.pubsub.Response`. + Optionally, can provide a callback that processes a batch of messages. Note that messages are + acknowledged (and thus permanently deleted) _before_ the batch callback runs, so it is recommended + to do as much processing as possible in the message callback and use a batch callback only when + necessary. - .. code-block:: python + .. code-block:: python - def my_msg_callback(alert): - # process the message here. we'll just print the ID. - print(f"processing message: {alert.metadata['message_id']}") + def my_msg_callback(alert): + # process the message here. we'll just print the ID. + print(f"processing message: {alert.metadata['message_id']}") - # return a Response. include a result if using a batch callback. - return pittgoogle.pubsub.Response(ack=True, result=alert.dict) + # return a Response. include a result if using a batch callback. + return pittgoogle.pubsub.Response(ack=True, result=alert.dict) - def my_batch_callback(results): - # process the batch of results (list of results returned by my_msg_callback) - # we'll just print the number of results in the batch - print(f"batch processing {len(results)} results) + def my_batch_callback(results): + # process the batch of results (list of results returned by my_msg_callback) + # we'll just print the number of results in the batch + print(f"batch processing {len(results)} results) - consumer = pittgoogle.pubsub.Consumer( - subscription=subscription, msg_callback=my_msg_callback, batch_callback=my_batch_callback - ) - - # open the stream in the background and process messages through the callbacks - # this blocks indefinitely. use `Ctrl-C` to close the stream and unblock - consumer.stream() + consumer = pittgoogle.pubsub.Consumer( + subscription=subscription, msg_callback=my_msg_callback, batch_callback=my_batch_callback + ) + # open the stream in the background and process messages through the callbacks + # this blocks indefinitely. use `Ctrl-C` to close the stream and unblock + consumer.stream() """ _subscription: Union[str, Subscription] = field(validator=instance_of((str, Subscription))) diff --git a/pittgoogle/types_.py b/pittgoogle/types_.py index a7da725..5403ee5 100644 --- a/pittgoogle/types_.py +++ b/pittgoogle/types_.py @@ -1,17 +1,15 @@ # -*- coding: UTF-8 -*- """Classes defining new types.""" +import datetime import importlib.resources import logging -from typing import TYPE_CHECKING, Optional +from pathlib import Path +from typing import Optional import fastavro import yaml from attrs import define, field -if TYPE_CHECKING: - import datetime - from pathlib import Path - LOGGER = logging.getLogger(__name__) PACKAGE_DIR = importlib.resources.files(__package__) @@ -26,9 +24,9 @@ class Schema: name: str = field() description: str = field() - path: Optional["Path"] = field(default=None) - _map: Optional[dict] = field(default=None, init=False) - _avsc: Optional[dict] = field(default=None, init=False) + path: Path | None = field(default=None) + _map: dict | None = field(default=None, init=False) + _avsc: dict | None = field(default=None, init=False) @property def survey(self) -> str: @@ -69,7 +67,7 @@ def avsc(self) -> Optional[dict]: @define(frozen=True) class PubsubMessageLike: - """Container for an incoming Pub/Sub message that mimics a `google.cloud.pubsub_v1.types.PubsubMessage`. + """Container for an incoming alert. It is convenient for the :class:`pittgoogle.Alert` class to work with a message as a `pubsub_v1.types.PubsubMessage`. However, there are many ways to obtain an alert that do @@ -80,7 +78,12 @@ class PubsubMessageLike: """ data: bytes = field() + """Alert data as bytes. This is also known as the message "payload".""" attributes: dict = field(factory=dict) - message_id: Optional[str] = field(default=None) - publish_time: Optional["datetime.datetime"] = field(default=None) - ordering_key: Optional[str] = field(default=None) + """Alert attributes. This is custom metadata attached to the Pub/Sub message.""" + message_id: str | None = field(default=None) + """Pub/Sub ID of the published message.""" + publish_time: datetime.datetime | None = field(default=None) + """Timestamp of the published message.""" + ordering_key: str | None = field(default=None) + """Pub/Sub ordering key of the published message.""" diff --git a/pittgoogle/utils.py b/pittgoogle/utils.py index b4e9467..760e575 100644 --- a/pittgoogle/utils.py +++ b/pittgoogle/utils.py @@ -1,20 +1,20 @@ # -*- coding: UTF-8 -*- """Classes and functions to support working with alerts and related data.""" +import base64 import json import logging -from base64 import b64decode, b64encode from collections import OrderedDict from io import BytesIO +import astropy.table +import astropy.time +import attrs import fastavro -from astropy.table import Table -from astropy.time import Time -from attrs import define LOGGER = logging.getLogger(__name__) -@define +@attrs.define class Cast: """Methods to convert data types.""" @@ -22,64 +22,58 @@ class Cast: def bytes_to_b64utf8(bytes_data): """Convert bytes data to UTF-8. - Parameters - ----------- - bytes_data : `bytes` - Data to be converted to UTF-8. + Args: + bytes_data (bytes): + Data to be converted to UTF-8. - Returns - ----------- - data : `dict` - ``bytes_data`` converted to a string in UTF-8 format + Returns: + str: + The ``bytes_data`` converted to a string in UTF-8 format. """ if bytes_data is not None: - return b64encode(bytes_data).decode("utf-8") + return base64.b64encode(bytes_data).decode("utf-8") @staticmethod def json_to_dict(bytes_data): - """Convert json serialized bytes data to a dict. + """Converts JSON serialized bytes data to a dictionary. - Parameters - ----------- - bytes_data : `bytes` - Data to be converted to a dictionary. + Args: + bytes_data (bytes): + Data to be converted to a dictionary. Returns: - data : `dict` - ``bytes_data`` unpacked into a dictionary. + dict: + The unpacked dictionary from the ``bytes_data``. """ if bytes_data is not None: return json.loads(bytes_data) @staticmethod def b64json_to_dict(bytes_data): - """Convert base64 encoded, json serialized bytes data to a dict. + """Converts base64 encoded, JSON serialized bytes data to a dictionary. - Parameters - ----------- - bytes_data : `Base64` - Data to be converted to a dictionary. + Args: + bytes_data (Base64): + Data to be converted to a dictionary. Returns: - data : `dict` - ``bytes_data`` unpacked into a dictionary. + dict: + The unpacked dictionary from the ``bytes_data``. """ if bytes_data is not None: - return Cast.json_to_dict(b64decode(bytes_data)) + return Cast.json_to_dict(base64.b64decode(bytes_data)) @staticmethod def avro_to_dict(bytes_data): - """Convert Avro serialized bytes data to a dict. The schema must be attached in the header. + """Converts Avro serialized bytes data to a dictionary. - Parameters - ------------ - bytes_data : `bytes` - Avro serialized bytes data to be converted to a dictionary + Args: + bytes_data (bytes): + Avro serialized bytes data to be converted to a dictionary. The schema must be attached in the header. - Returns - -------- - data : `dict` - ``bytes_data`` unpacked into a dictionary. + Returns: + dict: + The unpacked dictionary from the ``bytes_data``. """ if bytes_data is not None: with BytesIO(bytes_data) as fin: @@ -90,24 +84,32 @@ def avro_to_dict(bytes_data): @staticmethod def b64avro_to_dict(bytes_data): - """Convert base64 encoded, Avro serialized bytes data to a dict. + """Converts base64 encoded, Avro serialized bytes data to a dictionary. - Parameters - ----------- - bytes_data : `bytes`: - base64 encoded, Avro serialized bytes to be converted to a dictionary + Args: + bytes_data (bytes): + Base64 encoded, Avro serialized bytes data to be converted to a dictionary. - Returns - --------- - data : `dict` - ``bytes_data`` unpacked into a dictionary. + Returns: + dict: + The unpacked dictionary from the ``bytes_data``. """ - return Cast.avro_to_dict(b64decode(bytes_data)) + return Cast.avro_to_dict(base64.b64decode(bytes_data)) # --- Work with alert dictionaries @staticmethod - def alert_dict_to_table(alert_dict: dict) -> Table: - """Package a ZTF alert dictionary into an Astopy Table.""" + def alert_dict_to_table(alert_dict: dict) -> astropy.table.Table: + """Package a ZTF alert dictionary into an Astropy Table. + + Args: + alert_dict (dict): + A dictionary containing ZTF alert information. + + Returns: + astropy.table.Table: + An Astropy Table containing the alert information. + + """ # collect rows for the table candidate = OrderedDict(alert_dict["candidate"]) rows = [candidate] @@ -117,7 +119,7 @@ def alert_dict_to_table(alert_dict: dict) -> Table: rows.append(prv_cand_tmp) # create and return the table - table = Table(rows=rows) + table = astropy.table.Table(rows=rows) table.meta["comments"] = f"ZTF objectId: {alert_dict['objectId']}" return table @@ -126,9 +128,12 @@ def _strip_cutouts_ztf(alert_dict: dict) -> dict: """Drop the cutouts from the alert dictionary. Args: - alert_dict: ZTF alert formated as a dict + alert_dict (dict): + ZTF alert formatted as a dictionary. + Returns: - `alert_data` with the cutouts (postage stamps) removed + dict: + The modified `alert_dict` with the cutouts (postage stamps) removed. """ cutouts = ["cutoutScience", "cutoutTemplate", "cutoutDifference"] alert_stripped = {k: v for k, v in alert_dict.items() if k not in cutouts} @@ -136,20 +141,18 @@ def _strip_cutouts_ztf(alert_dict: dict) -> dict: # dates @staticmethod - def jd_to_readable_date(jd): - """Convert a Julian date to a human readable string. - - Parameters - ----------- - jd : `float` - Datetime value in julian format - - Returns - -------- - date : `str` - ``jd`` in the format 'day mon year hour:min' + def jd_to_readable_date(jd) -> str: + """Converts a Julian date to a human-readable string. + + Args: + jd (float): + Datetime value in Julian format. + + Returns: + str: + The ``jd`` in the format 'day mon year hour:min'. """ - return Time(jd, format="jd").strftime("%d %b %Y - %H:%M:%S") + return astropy.time.Time(jd, format="jd").strftime("%d %b %Y - %H:%M:%S") # --- Survey-specific