Skip to content

Commit

Permalink
Rename PubSubInvalid -> CloudConnectionError (#48)
Browse files Browse the repository at this point in the history
* rename PubSubInvalid -> CloudConnectionError

* add docs faq section for GC services (still blank with "TODO"s)

* prep v0.3.6 update CHANGELOG
  • Loading branch information
troyraen authored Jul 1, 2024
1 parent 4572e27 commit 3133b0e
Show file tree
Hide file tree
Showing 9 changed files with 53 additions and 15 deletions.
6 changes: 6 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,12 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/)

(none)

## \[v0.3.6\] - 2024-07-01

### Changed

- Renamed `exceptions.PubSubInvalid` -> `exceptions.CloudConnectionError`, repurposed for more general use.

## \[v0.3.5\] - 2024-07-01

### Added
Expand Down
2 changes: 1 addition & 1 deletion docs/source/faq/cost.rst
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
.. _cost:

Costs
--------------
======

The Pitt-Google Alert Broker makes data available in Google Cloud repositories.
The data are public and user-pays, meaning that anyone can access as much or little as they want, and everyone pays for what *they* use.
Expand Down
10 changes: 10 additions & 0 deletions docs/source/faq/index.rst
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,17 @@ Frequently Asked Questions
==============================================

.. toctree::
:caption: Google Cloud Basics
:maxdepth: 1

cost
find-project-id

.. toctree::
:caption: Google Cloud Services
:maxdepth: 1

what-is-bigquery
what-is-cloud-run
what-is-cloud-storage
what-is-pubsub
4 changes: 4 additions & 0 deletions docs/source/faq/what-is-bigquery.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
What is BigQuery?
=================

Google Cloud's BigQuery is ... # [TODO]
4 changes: 4 additions & 0 deletions docs/source/faq/what-is-cloud-run.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
What is Cloud Run?
==================

Google Cloud's Cloud Run is ... # [TODO]
4 changes: 4 additions & 0 deletions docs/source/faq/what-is-cloud-storage.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
What is Cloud Storage?
======================

Google Cloud's Cloud Storage is ... # [TODO]
4 changes: 4 additions & 0 deletions docs/source/faq/what-is-pubsub.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
What is Pub/Sub?
=================

Google Cloud's Pub/Sub is ... # [TODO]
7 changes: 4 additions & 3 deletions pittgoogle/exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,13 @@ class BadRequest(Exception):
"""Raised when a Flask request json envelope (e.g., from Cloud Run) is invalid."""


class CloudConnectionError(Exception):
"""Raised when a problem is encountered while trying to a Google Cloud resource."""


class OpenAlertError(Exception):
"""Raised when unable to deserialize a Pub/Sub message payload."""


class PubSubInvalid(Exception):
"""Raised when an invalid Pub/Sub configuration is encountered."""

class SchemaNotFoundError(Exception):
"""Raised when a schema with a given name is not found in the registry."""
27 changes: 16 additions & 11 deletions pittgoogle/pubsub.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,10 @@
from attrs.validators import gt, instance_of, is_callable, optional
from google.api_core.exceptions import NotFound

# [FIXME] clean up these imports
from . import exceptions
from .alert import Alert
from .auth import Auth
from .exceptions import PubSubInvalid, SchemaNotFoundError

LOGGER = logging.getLogger(__name__)
PACKAGE_DIR = importlib.resources.files(__package__)
Expand Down Expand Up @@ -67,9 +68,13 @@ def pull_batch(
if isinstance(subscription, str):
subscription = Subscription(subscription, **subscription_kwargs)

response = subscription.client.pull(
{"subscription": subscription.path, "max_messages": max_messages}
)
try:
response = subscription.client.pull(
{"subscription": subscription.path, "max_messages": max_messages}
)
except NotFound:
msg = "Subscription not found. You may need to create one using `pittgoogle.Subscription.touch`."
raise exceptions.CloudConnectionError(msg)

alerts = [
Alert.from_msg(msg.message, schema_name=schema_name) for msg in response.received_messages
Expand Down Expand Up @@ -235,7 +240,7 @@ def publish(self, alert: "Alert") -> int:
# schema exists so we can be lenient and just fall back to json instead of raising an error.
try:
alert.schema
except SchemaNotFoundError:
except exceptions.SchemaNotFoundError:
avro_schema = None
else:
if alert.schema.survey in ["elasticc"]:
Expand Down Expand Up @@ -345,18 +350,18 @@ def touch(self) -> None:
`google.api_core.exceptions.NotFound`
if the subscription needs to be created but the topic does not exist in Google Cloud.
`pittgoogle.exceptions.PubSubInvalid`
`pittgoogle.exceptions.CloudConnectionError`
if the subscription exists but it is not attached to self.topic and self.topic is not None.
"""
try:
subscrip = self.client.get_subscription(subscription=self.path)
LOGGER.info(f"subscription exists: {self.path}")

except NotFound:
subscrip = self._create() # may raise TypeError or NotFound
subscrip = self._create() # may raise TypeError or (topic) NotFound
LOGGER.info(f"subscription created: {self.path}")

self._set_topic(subscrip.topic) # may raise PubSubInvalid
self._set_topic(subscrip.topic) # may raise CloudConnectionError

def _create(self) -> pubsub_v1.types.Subscription:
if self.topic is None:
Expand All @@ -377,10 +382,9 @@ def _set_topic(self, connected_topic_path) -> None:
"The subscription exists but is attached to a different topic.\n"
f"\tFound topic: {connected_topic_path}\n"
f"\tExpected topic: {self.topic.path}\n"
"Either point to the found topic using a keyword argument or"
"delete the existing subscription and try again."
"Either point to the found topic or delete the existing subscription and try again."
)
raise PubSubInvalid(msg)
raise exceptions.CloudConnectionError(msg)

# if the topic isn't already set, do it now
if self.topic is None:
Expand Down Expand Up @@ -410,6 +414,7 @@ def pull_batch(self, max_messages: int = 1) -> List["Alert"]:
max_messages : `int`
Maximum number of messages to be pulled.
"""
# Wrapping the module-level function
return pull_batch(self, max_messages=max_messages, schema_name=self.schema_name)

def purge(self):
Expand Down

0 comments on commit 3133b0e

Please sign in to comment.