diff --git a/pittgoogle/auth.py b/pittgoogle/auth.py index e436b43..eab661b 100644 --- a/pittgoogle/auth.py +++ b/pittgoogle/auth.py @@ -1,5 +1,5 @@ # -*- coding: UTF-8 -*- -"""A class to handle authentication with Google Cloud. +"""Classes to manage authentication with Google Cloud. .. contents:: :local: diff --git a/pittgoogle/bigquery.py b/pittgoogle/bigquery.py index f55ae3f..4172269 100644 --- a/pittgoogle/bigquery.py +++ b/pittgoogle/bigquery.py @@ -9,8 +9,7 @@ This module relies on :mod:`pittgoogle.auth` to authenticate API calls. The examples given below assume the use of a :ref:`service account ` and - :ref:`environment variables `. In this case, :mod:`pittgoogle.auth` does not - need to be called explicitly. + :ref:`environment variables `. Usage Examples --------------- diff --git a/pittgoogle/pubsub.py b/pittgoogle/pubsub.py index f949d9f..cafc466 100644 --- a/pittgoogle/pubsub.py +++ b/pittgoogle/pubsub.py @@ -1,83 +1,11 @@ # -*- coding: UTF-8 -*- """Classes to facilitate connections to Pub/Sub streams. -.. contents:: - :local: - :depth: 2 - .. note:: This module relies on :mod:`pittgoogle.auth` to authenticate API calls. The examples given below assume the use of a :ref:`service account ` and - :ref:`environment variables `. In this case, :mod:`pittgoogle.auth` does not - need to be called explicitly. - -Usage Examples ---------------- - -.. code-block:: python - - import pittgoogle - -Create a subscription to the "ztf-loop" topic: - -.. code-block:: python - - # topic the subscription will be connected to - # only required if the subscription does not yet exist in Google Cloud - topic = pittgoogle.Topic(name="ztf-loop", projectid=pittgoogle.ProjectIds.pittgoogle) - - # choose your own name for the subscription - subscription = pittgoogle.Subscription(name="my-ztf-loop-subscription", topic=topic, schema_name="ztf") - - # make sure the subscription exists and we can connect to it. create it if necessary - subscription.touch() - -Pull a small batch of alerts. Helpful for testing. Not recommended for long-runnining listeners. - -.. code-block:: python - - alerts = pittgoogle.pubsub.pull_batch(subscription, max_messages=4) - -Open a streaming pull. Recommended for long-runnining listeners. This will pull and process -messages in the background, indefinitely. User must supply a callback that processes a single message. -It should accept a :class:`pittgoogle.pubsub.Alert` and return a :class:`pittgoogle.pubsub.Response`. -Optionally, can provide a callback that processes a batch of messages. Note that messages are -acknowledged (and thus permanently deleted) _before_ the batch callback runs, so it is recommended -to do as much processing as possible in the message callback and use a batch callback only when -necessary. - -.. code-block:: python - - def my_msg_callback(alert): - # process the message here. we'll just print the ID. - print(f"processing message: {alert.metadata['message_id']}") - - # return a Response. include a result if using a batch callback. - return pittgoogle.pubsub.Response(ack=True, result=alert.dict) - - def my_batch_callback(results): - # process the batch of results (list of results returned by my_msg_callback) - # we'll just print the number of results in the batch - print(f"batch processing {len(results)} results) - - consumer = pittgoogle.pubsub.Consumer( - subscription=subscription, msg_callback=my_msg_callback, batch_callback=my_batch_callback - ) - - # open the stream in the background and process messages through the callbacks - # this blocks indefinitely. use `Ctrl-C` to close the stream and unblock - consumer.stream() - -Delete the subscription from Google Cloud. - -.. code-block:: python - - subscription.delete() - -API ----- - + :ref:`environment variables `. """ import datetime import importlib.resources @@ -353,6 +281,29 @@ class Subscription: One of "ztf", "ztf.lite", "elasticc.v0_9_1.alert", "elasticc.v0_9_1.brokerClassification". Schema name of the alerts in the subscription. Passed to :class:`pittgoogle.pubsub.Alert` for unpacking. If not provided, some properties of the `Alert` may not be available. + + Usage + ----- + Create a subscription to the "ztf-loop" topic: + + .. code-block:: python + + # topic the subscription will be connected to + # only required if the subscription does not yet exist in Google Cloud + topic = pittgoogle.Topic(name="ztf-loop", projectid=pittgoogle.ProjectIds.pittgoogle) + + # choose your own name for the subscription + subscription = pittgoogle.Subscription(name="my-ztf-loop-subscription", topic=topic, schema_name="ztf") + + # make sure the subscription exists and we can connect to it. create it if necessary + subscription.touch() + + Pull a small batch of alerts. Helpful for testing. (For long-runnining listeners, see + :class:`pittgoogle.Consumer`.) + + .. code-block:: python + + alerts = subscription.pull_batch(subscription, max_messages=4) """ name: str = field() @@ -502,6 +453,40 @@ class Consumer: Maximum number of workers for the `executor`. This has no effect if an `executor` is provided. executor : `concurrent.futures.ThreadPoolExecutor`, optional Executor to be used by the Google API to pull and process messages in the background. + + Usage + ----- + + Open a streaming pull. Recommended for long-runnining listeners. This will pull and process + messages in the background, indefinitely. User must supply a callback that processes a single message. + It should accept a :class:`pittgoogle.pubsub.Alert` and return a :class:`pittgoogle.pubsub.Response`. + Optionally, can provide a callback that processes a batch of messages. Note that messages are + acknowledged (and thus permanently deleted) _before_ the batch callback runs, so it is recommended + to do as much processing as possible in the message callback and use a batch callback only when + necessary. + + .. code-block:: python + + def my_msg_callback(alert): + # process the message here. we'll just print the ID. + print(f"processing message: {alert.metadata['message_id']}") + + # return a Response. include a result if using a batch callback. + return pittgoogle.pubsub.Response(ack=True, result=alert.dict) + + def my_batch_callback(results): + # process the batch of results (list of results returned by my_msg_callback) + # we'll just print the number of results in the batch + print(f"batch processing {len(results)} results) + + consumer = pittgoogle.pubsub.Consumer( + subscription=subscription, msg_callback=my_msg_callback, batch_callback=my_batch_callback + ) + + # open the stream in the background and process messages through the callbacks + # this blocks indefinitely. use `Ctrl-C` to close the stream and unblock + consumer.stream() + """ _subscription: Union[str, Subscription] = field(validator=instance_of((str, Subscription)))