Skip to content

Commit

Permalink
fixup documentation
Browse files Browse the repository at this point in the history
  • Loading branch information
troyraen committed Jun 29, 2024
1 parent ac7b436 commit dc8652d
Show file tree
Hide file tree
Showing 3 changed files with 60 additions and 76 deletions.
2 changes: 1 addition & 1 deletion pittgoogle/auth.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
# -*- coding: UTF-8 -*-
"""A class to handle authentication with Google Cloud.
"""Classes to manage authentication with Google Cloud.
.. contents::
:local:
Expand Down
3 changes: 1 addition & 2 deletions pittgoogle/bigquery.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,7 @@
This module relies on :mod:`pittgoogle.auth` to authenticate API calls.
The examples given below assume the use of a :ref:`service account <service account>` and
:ref:`environment variables <set env vars>`. In this case, :mod:`pittgoogle.auth` does not
need to be called explicitly.
:ref:`environment variables <set env vars>`.
Usage Examples
---------------
Expand Down
131 changes: 58 additions & 73 deletions pittgoogle/pubsub.py
Original file line number Diff line number Diff line change
@@ -1,83 +1,11 @@
# -*- coding: UTF-8 -*-
"""Classes to facilitate connections to Pub/Sub streams.
.. contents::
:local:
:depth: 2
.. note::
This module relies on :mod:`pittgoogle.auth` to authenticate API calls.
The examples given below assume the use of a :ref:`service account <service account>` and
:ref:`environment variables <set env vars>`. In this case, :mod:`pittgoogle.auth` does not
need to be called explicitly.
Usage Examples
---------------
.. code-block:: python
import pittgoogle
Create a subscription to the "ztf-loop" topic:
.. code-block:: python
# topic the subscription will be connected to
# only required if the subscription does not yet exist in Google Cloud
topic = pittgoogle.Topic(name="ztf-loop", projectid=pittgoogle.ProjectIds.pittgoogle)
# choose your own name for the subscription
subscription = pittgoogle.Subscription(name="my-ztf-loop-subscription", topic=topic, schema_name="ztf")
# make sure the subscription exists and we can connect to it. create it if necessary
subscription.touch()
Pull a small batch of alerts. Helpful for testing. Not recommended for long-runnining listeners.
.. code-block:: python
alerts = pittgoogle.pubsub.pull_batch(subscription, max_messages=4)
Open a streaming pull. Recommended for long-runnining listeners. This will pull and process
messages in the background, indefinitely. User must supply a callback that processes a single message.
It should accept a :class:`pittgoogle.pubsub.Alert` and return a :class:`pittgoogle.pubsub.Response`.
Optionally, can provide a callback that processes a batch of messages. Note that messages are
acknowledged (and thus permanently deleted) _before_ the batch callback runs, so it is recommended
to do as much processing as possible in the message callback and use a batch callback only when
necessary.
.. code-block:: python
def my_msg_callback(alert):
# process the message here. we'll just print the ID.
print(f"processing message: {alert.metadata['message_id']}")
# return a Response. include a result if using a batch callback.
return pittgoogle.pubsub.Response(ack=True, result=alert.dict)
def my_batch_callback(results):
# process the batch of results (list of results returned by my_msg_callback)
# we'll just print the number of results in the batch
print(f"batch processing {len(results)} results)
consumer = pittgoogle.pubsub.Consumer(
subscription=subscription, msg_callback=my_msg_callback, batch_callback=my_batch_callback
)
# open the stream in the background and process messages through the callbacks
# this blocks indefinitely. use `Ctrl-C` to close the stream and unblock
consumer.stream()
Delete the subscription from Google Cloud.
.. code-block:: python
subscription.delete()
API
----
:ref:`environment variables <set env vars>`.
"""
import datetime
import importlib.resources
Expand Down Expand Up @@ -353,6 +281,29 @@ class Subscription:
One of "ztf", "ztf.lite", "elasticc.v0_9_1.alert", "elasticc.v0_9_1.brokerClassification".
Schema name of the alerts in the subscription. Passed to :class:`pittgoogle.pubsub.Alert`
for unpacking. If not provided, some properties of the `Alert` may not be available.
Usage
-----
Create a subscription to the "ztf-loop" topic:
.. code-block:: python
# topic the subscription will be connected to
# only required if the subscription does not yet exist in Google Cloud
topic = pittgoogle.Topic(name="ztf-loop", projectid=pittgoogle.ProjectIds.pittgoogle)
# choose your own name for the subscription
subscription = pittgoogle.Subscription(name="my-ztf-loop-subscription", topic=topic, schema_name="ztf")
# make sure the subscription exists and we can connect to it. create it if necessary
subscription.touch()
Pull a small batch of alerts. Helpful for testing. (For long-runnining listeners, see
:class:`pittgoogle.Consumer`.)
.. code-block:: python
alerts = subscription.pull_batch(subscription, max_messages=4)
"""

name: str = field()
Expand Down Expand Up @@ -502,6 +453,40 @@ class Consumer:
Maximum number of workers for the `executor`. This has no effect if an `executor` is provided.
executor : `concurrent.futures.ThreadPoolExecutor`, optional
Executor to be used by the Google API to pull and process messages in the background.
Usage
-----
Open a streaming pull. Recommended for long-runnining listeners. This will pull and process
messages in the background, indefinitely. User must supply a callback that processes a single message.
It should accept a :class:`pittgoogle.pubsub.Alert` and return a :class:`pittgoogle.pubsub.Response`.
Optionally, can provide a callback that processes a batch of messages. Note that messages are
acknowledged (and thus permanently deleted) _before_ the batch callback runs, so it is recommended
to do as much processing as possible in the message callback and use a batch callback only when
necessary.
.. code-block:: python
def my_msg_callback(alert):
# process the message here. we'll just print the ID.
print(f"processing message: {alert.metadata['message_id']}")
# return a Response. include a result if using a batch callback.
return pittgoogle.pubsub.Response(ack=True, result=alert.dict)
def my_batch_callback(results):
# process the batch of results (list of results returned by my_msg_callback)
# we'll just print the number of results in the batch
print(f"batch processing {len(results)} results)
consumer = pittgoogle.pubsub.Consumer(
subscription=subscription, msg_callback=my_msg_callback, batch_callback=my_batch_callback
)
# open the stream in the background and process messages through the callbacks
# this blocks indefinitely. use `Ctrl-C` to close the stream and unblock
consumer.stream()
"""

_subscription: Union[str, Subscription] = field(validator=instance_of((str, Subscription)))
Expand Down

0 comments on commit dc8652d

Please sign in to comment.