Skip to content

Commit

Permalink
fixup documentation
Browse files Browse the repository at this point in the history
  • Loading branch information
troyraen committed Jun 29, 2024
1 parent ac7b436 commit 39a92a1
Show file tree
Hide file tree
Showing 5 changed files with 137 additions and 182 deletions.
90 changes: 42 additions & 48 deletions pittgoogle/auth.py
Original file line number Diff line number Diff line change
@@ -1,41 +1,12 @@
# -*- coding: UTF-8 -*-
"""A class to handle authentication with Google Cloud.
.. contents::
:local:
:depth: 2
"""Classes to manage authentication with Google Cloud.
.. note::
To authenticate, you must have completed one of the setup options described in
:doc:`/main/one-time-setup/authentication`. The recommended workflow is to use a
:doc:`/main/one-time-setup/authentication`. The recommendation is to use a
:ref:`service account <service account>` and :ref:`set environment variables <set env vars>`.
In that case, you will not need to call this module directly.
Usage Example
--------------
The basic call is:
.. code-block:: python
import pittgoogle
myauth = pittgoogle.auth.Auth()
This will load authentication settings from your :ref:`environment variables <set env vars>`.
You can override this behavior with keyword arguments. This does not automatically load the
credentials. To do that, request them explicitly:
.. code-block:: python
myauth.credentials
It will first look for a service account key file, then fallback to OAuth2.
API
----
"""
import logging
import os
Expand All @@ -56,28 +27,51 @@

@define
class Auth:
"""Credentials for authentication to a Google Cloud project.
"""Credentials for authenticating with a Google Cloud project.
This class provides methods to obtain and load credentials from either a service account
key file or an OAuth2 session.
To authenticate, you must have completed one of the setup options described in the
:doc:`/main/one-time-setup/authentication`.:doc:`/main/one-time-setup/authentication`
Attributes
----------
GOOGLE_CLOUD_PROJECT : str
The project ID of the Google Cloud project to connect to. This can be set as an
environment variable.
GOOGLE_APPLICATION_CREDENTIALS : str
The path to a keyfile containing service account credentials. Either this or the
`OAUTH_CLIENT_*` settings are required for successful authentication.
OAUTH_CLIENT_ID : str
The client ID for an OAuth2 connection. Either this and `OAUTH_CLIENT_SECRET`, or
the `GOOGLE_APPLICATION_CREDENTIALS` setting, are required for successful
authentication.
OAUTH_CLIENT_SECRET : str
The client secret for an OAuth2 connection. Either this and `OAUTH_CLIENT_ID`, or
the `GOOGLE_APPLICATION_CREDENTIALS` setting, are required for successful
authentication.
Usage
-----
The basic call is:
.. code-block:: python
Missing parameters will be obtained from an environment variable of the same name,
if it exists.
myauth = pittgoogle.Auth()
:param GOOGLE_CLOUD_PROJECT:
Project ID of the Google Cloud project to connect to.
This will load authentication settings from your :ref:`environment variables <set env vars>`.
You can override this behavior with keyword arguments. This does not automatically load the
credentials. To do that, request them explicitly:
:param GOOGLE_APPLICATION_CREDENTIALS:
Path to a keyfile containing service account credentials.
Either this or both `OAUTH_CLIENT_*` settings are required for successful
authentication using `Auth`.
.. code-block:: python
:param OAUTH_CLIENT_ID:
Client ID for an OAuth2 connection.
Either this and `OAUTH_CLIENT_SECRET`, or the `GOOGLE_APPLICATION_CREDENTIALS`
setting, are required for successful authentication using `Auth`.
myauth.credentials
:param OAUTH_CLIENT_SECRET:
Client secret for an OAuth2 connection.
Either this and `OAUTH_CLIENT_ID`, or the `GOOGLE_APPLICATION_CREDENTIALS` setting,
are required for successful authentication using `Auth`.
It will first look for a service account key file, then fallback to OAuth2.
"""

GOOGLE_CLOUD_PROJECT = field(factory=lambda: os.getenv("GOOGLE_CLOUD_PROJECT", None))
Expand Down
20 changes: 1 addition & 19 deletions pittgoogle/bigquery.py
Original file line number Diff line number Diff line change
@@ -1,29 +1,11 @@
# -*- coding: UTF-8 -*-
"""Classes to facilitate connections to BigQuery datasets and tables.
.. contents::
:local:
:depth: 2
.. note::
This module relies on :mod:`pittgoogle.auth` to authenticate API calls.
The examples given below assume the use of a :ref:`service account <service account>` and
:ref:`environment variables <set env vars>`. In this case, :mod:`pittgoogle.auth` does not
need to be called explicitly.
Usage Examples
---------------
.. code-block:: python
import pittgoogle
[TODO]
API
----
:ref:`environment variables <set env vars>`.
"""
import logging
from typing import Optional, Union
Expand Down
203 changes: 91 additions & 112 deletions pittgoogle/pubsub.py
Original file line number Diff line number Diff line change
@@ -1,83 +1,11 @@
# -*- coding: UTF-8 -*-
"""Classes to facilitate connections to Pub/Sub streams.
.. contents::
:local:
:depth: 2
.. note::
This module relies on :mod:`pittgoogle.auth` to authenticate API calls.
The examples given below assume the use of a :ref:`service account <service account>` and
:ref:`environment variables <set env vars>`. In this case, :mod:`pittgoogle.auth` does not
need to be called explicitly.
Usage Examples
---------------
.. code-block:: python
import pittgoogle
Create a subscription to the "ztf-loop" topic:
.. code-block:: python
# topic the subscription will be connected to
# only required if the subscription does not yet exist in Google Cloud
topic = pittgoogle.Topic(name="ztf-loop", projectid=pittgoogle.ProjectIds.pittgoogle)
# choose your own name for the subscription
subscription = pittgoogle.Subscription(name="my-ztf-loop-subscription", topic=topic, schema_name="ztf")
# make sure the subscription exists and we can connect to it. create it if necessary
subscription.touch()
Pull a small batch of alerts. Helpful for testing. Not recommended for long-runnining listeners.
.. code-block:: python
alerts = pittgoogle.pubsub.pull_batch(subscription, max_messages=4)
Open a streaming pull. Recommended for long-runnining listeners. This will pull and process
messages in the background, indefinitely. User must supply a callback that processes a single message.
It should accept a :class:`pittgoogle.pubsub.Alert` and return a :class:`pittgoogle.pubsub.Response`.
Optionally, can provide a callback that processes a batch of messages. Note that messages are
acknowledged (and thus permanently deleted) _before_ the batch callback runs, so it is recommended
to do as much processing as possible in the message callback and use a batch callback only when
necessary.
.. code-block:: python
def my_msg_callback(alert):
# process the message here. we'll just print the ID.
print(f"processing message: {alert.metadata['message_id']}")
# return a Response. include a result if using a batch callback.
return pittgoogle.pubsub.Response(ack=True, result=alert.dict)
def my_batch_callback(results):
# process the batch of results (list of results returned by my_msg_callback)
# we'll just print the number of results in the batch
print(f"batch processing {len(results)} results)
consumer = pittgoogle.pubsub.Consumer(
subscription=subscription, msg_callback=my_msg_callback, batch_callback=my_batch_callback
)
# open the stream in the background and process messages through the callbacks
# this blocks indefinitely. use `Ctrl-C` to close the stream and unblock
consumer.stream()
Delete the subscription from Google Cloud.
.. code-block:: python
subscription.delete()
API
----
:ref:`environment variables <set env vars>`.
"""
import datetime
import importlib.resources
Expand Down Expand Up @@ -334,25 +262,46 @@ def publish(self, alert: "Alert") -> int:

@define
class Subscription:
"""Basic attributes of a Pub/Sub subscription and methods to manage it.
"""Creates a Pub/Sub subscription and provides methods to manage it.
Parameters
-----------
name : `str`
Name of the Pub/Sub subscription.
auth : :class:`pittgoogle.auth.Auth`, optional
Credentials for the Google Cloud project that owns this subscription. If not provided,
it will be created from environment variables.
topic : :class:`pittgoogle.pubsub.Topic`, optional
Topic this subscription should be attached to. Required only when the subscription needs
to be created.
client : `pubsub_v1.SubscriberClient`, optional
Pub/Sub client that will be used to access the subscription. This kwarg is useful if you
want to reuse a client. If None, a new client will be created.
schema_name : `str`
One of "ztf", "ztf.lite", "elasticc.v0_9_1.alert", "elasticc.v0_9_1.brokerClassification".
Schema name of the alerts in the subscription. Passed to :class:`pittgoogle.pubsub.Alert`
for unpacking. If not provided, some properties of the `Alert` may not be available.
Args:
name (str):
Name of the Pub/Sub subscription.
auth (pittgoogle.auth.Auth, optional):
Credentials for the Google Cloud project that owns this subscription. If not provided, it will be created
from environment variables.
topic (pittgoogle.pubsub.Topic, optional):
Topic this subscription should be attached to. Required only when the subscription needs to be created.
client (google.cloud.pubsub_v1.SubscriberClient, optional):
Pub/Sub client that will be used to access the subscription. This kwarg is useful if you want to
reuse a client. If None, a new client will be created.
schema_name (str):
Schema name of the alerts in the subscription. Passed to :class:`pittgoogle.Alert` for unpacking.
If not provided, some properties of the Alert may not be available. For a list of schema names, see
:meth:`pittgoogle.Schemas.names`.
Usage:
Create a subscription to the "ztf-loop" topic:
.. code-block:: python
# topic the subscription will be connected to
# only required if the subscription does not yet exist in Google Cloud
topic = pittgoogle.Topic(name="ztf-loop", projectid=pittgoogle.ProjectIds.pittgoogle)
# choose your own name for the subscription
subscription = pittgoogle.Subscription(name="my-ztf-loop-subscription", topic=topic, schema_name="ztf")
# make sure the subscription exists and we can connect to it. create it if necessary
subscription.touch()
Pull a small batch of alerts. Helpful for testing. (For long-runnining listeners, see
:class:`pittgoogle.Consumer`.)
.. code-block:: python
alerts = subscription.pull_batch(subscription, max_messages=4)
"""

name: str = field()
Expand Down Expand Up @@ -481,27 +430,57 @@ def purge(self):
class Consumer:
"""Consumer class to pull a Pub/Sub subscription and process messages.
Parameters
-----------
subscription : `str` or :class:`pittgoogle.pubsub.Subscription`
Pub/Sub subscription to be pulled (it must already exist in Google Cloud).
msg_callback : `callable`
Function that will process a single message. It should accept a
:class:`pittgoogle.pubsub.Alert` and return a :class:`pittgoogle.pubsub.Response`.
batch_callback : `callable`, optional
Function that will process a batch of results. It should accept a list of the results
returned by the `msg_callback`.
batch_maxn : `int`, optional
Maximum number of messages in a batch. This has no effect if `batch_callback` is None.
batch_max_wait_between_messages : `int`, optional
Max number of seconds to wait between messages before before processing a batch.
This has no effect if `batch_callback` is None.
max_backlog : `int`, optional
Maximum number of pulled but unprocessed messages before pausing the pull.
max_workers : `int`, optional
Maximum number of workers for the `executor`. This has no effect if an `executor` is provided.
executor : `concurrent.futures.ThreadPoolExecutor`, optional
Executor to be used by the Google API to pull and process messages in the background.
Args:
subscription (str or Subscription):
Pub/Sub subscription to be pulled (it must already exist in Google Cloud).
msg_callback (callable):
Function that will process a single message. It should accept a Alert and return a Response.
batch_callback (callable, optional):
Function that will process a batch of results. It should accept a list of the results
returned by the msg_callback.
batch_maxn (int, optional):
Maximum number of messages in a batch. This has no effect if batch_callback is None.
batch_max_wait_between_messages (int, optional):
Max number of seconds to wait between messages before processing a batch. This has
no effect if batch_callback is None.
max_backlog (int, optional):
Maximum number of pulled but unprocessed messages before pausing the pull.
max_workers (int, optional):
Maximum number of workers for the executor. This has no effect if an executor is provided.
executor (concurrent.futures.ThreadPoolExecutor, optional):
Executor to be used by the Google API to pull and process messages in the background.
Usage:
Open a streaming pull. Recommended for long-running listeners. This will pull and process
messages in the background, indefinitely. User must supply a callback that processes a single message.
It should accept a :class:`pittgoogle.pubsub.Alert` and return a :class:`pittgoogle.pubsub.Response`.
Optionally, can provide a callback that processes a batch of messages. Note that messages are
acknowledged (and thus permanently deleted) _before_ the batch callback runs, so it is recommended
to do as much processing as possible in the message callback and use a batch callback only when
necessary.
.. code-block:: python
def my_msg_callback(alert):
# process the message here. we'll just print the ID.
print(f"processing message: {alert.metadata['message_id']}")
# return a Response. include a result if using a batch callback.
return pittgoogle.pubsub.Response(ack=True, result=alert.dict)
def my_batch_callback(results):
# process the batch of results (list of results returned by my_msg_callback)
# we'll just print the number of results in the batch
print(f"batch processing {len(results)} results)
consumer = pittgoogle.pubsub.Consumer(
subscription=subscription, msg_callback=my_msg_callback, batch_callback=my_batch_callback
)
# open the stream in the background and process messages through the callbacks
# this blocks indefinitely. use `Ctrl-C` to close the stream and unblock
consumer.stream()
"""

_subscription: Union[str, Subscription] = field(validator=instance_of((str, Subscription)))
Expand Down
4 changes: 2 additions & 2 deletions pittgoogle/types_.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
# -*- coding: UTF-8 -*-
"""Functions to support working with alerts and related data."""
"""Classes defining new types."""
import importlib.resources
import logging
from typing import TYPE_CHECKING, Optional
Expand All @@ -21,7 +21,7 @@ class Schema:
"""Class for an individual schema.
This class is not intended to be used directly.
Use `pittgoogle.registry.Schemas` instead.
Use :class:`pittgoogle.Schemas` instead.
"""

name: str = field()
Expand Down
2 changes: 1 addition & 1 deletion pittgoogle/utils.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
# -*- coding: UTF-8 -*-
"""Functions to support working with alerts and related data."""
"""Classes and functions to support working with alerts and related data."""
import json
import logging
from base64 import b64decode, b64encode
Expand Down

0 comments on commit 39a92a1

Please sign in to comment.