From 017204c88c94e960a74c0f6df175b26965f355d4 Mon Sep 17 00:00:00 2001 From: Troy Raen Date: Mon, 22 Jul 2024 08:34:43 -0700 Subject: [PATCH] Add more BigQuery support. Fix up documentation. (#60) * add Table.schema property * add bigquery.Client class * add Table.query method * add Table.__getattr__ * don't change the project id * improve exception handling * Remove Table.auth and simplify Table.client * add dependency on db-dtypes * fix up poetry docs --- CHANGELOG.md | 17 +- .../manage-dependencies-poetry.md | 43 +-- pittgoogle/bigquery.py | 324 +++++++++++++++--- pittgoogle/exceptions.py | 4 +- pittgoogle/pubsub.py | 147 ++++---- pittgoogle/schema.py | 12 +- poetry.lock | 78 ++++- pyproject.toml | 1 + 8 files changed, 467 insertions(+), 159 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index c90054c..826799f 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -10,7 +10,22 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/) ## \[Unreleased\] -(none) +### Added + +- Add `bigquery.Client` class. +- Add class attributes `Table.query`, `Table.schema`. +- Add dependency on 'db-dtypes' to support BigQuery -> Pandas. + +### Changed + +- Remove `Table.auth` and simplify `Table.client`. This functionality is now managed by + `bigquery.Client`. +- In `Table` and `Topic`, the project ID is no longer changed away from what the user provided. + It was more confusing and dangerous than it was helpful. + +### Removed + +- `Table.auth` ## \[v0.3.9\] - 2024-07-02 diff --git a/docs/source/for-developers/manage-dependencies-poetry.md b/docs/source/for-developers/manage-dependencies-poetry.md index 3cfc2ed..3f841d2 100644 --- a/docs/source/for-developers/manage-dependencies-poetry.md +++ b/docs/source/for-developers/manage-dependencies-poetry.md @@ -17,26 +17,11 @@ conda activate poetry-py311 pip install poetry ``` -If you come back to this later you may need to reactivate your environment. - -```bash -conda activate poetry-py311 -``` - -If you want to start over with a fresh environment, deactivate the environment and remove it. - -```bash -conda deactivate -conda remove --name poetry-py311 --all -``` - ## Install existing dependencies This repo already contains a poetry.lock file, so running `poetry install` will give you the exact versions specified there ([Poetry install dependencies](https://python-poetry.org/docs/basic-usage/#installing-dependencies)). -If you would rather start over completely, skip ahead to the next section. - ```bash poetry install ``` @@ -47,6 +32,20 @@ If you want to install the docs dependencies as well, use: poetry install --extras=docs ``` +## Add a Dependency + +Here are two examples +([Poetry add dependencies](https://python-poetry.org/docs/managing-dependencies/#adding-a-dependency-to-a-group), +see also: [Poetry version-constraint syntax](https://python-poetry.org/docs/dependency-specification/)): + +```bash +# This example adds pandas to the main dependencies. +poetry add pandas + +# This example adds sphinx to the docs dependencies. +poetry add sphinx --group docs.dependencies +``` + ## Update Dependency Versions To upgrade to the latest versions compatible with the pyproject.toml file, you have two options below @@ -62,17 +61,3 @@ poetry update ``` Now commit the updated poetry.lock file to the repo. - -## Add a Dependency - -Here are two examples -([Poetry add dependencies](https://python-poetry.org/docs/managing-dependencies/#adding-a-dependency-to-a-group), -see also: [Poetry version-constraint syntax](https://python-poetry.org/docs/dependency-specification/)): - -```bash -# This example adds pandas to the main dependencies. -poetry add pandas - -# This example adds sphinx to the docs dependencies. -poetry add sphinx --group docs.dependencies -``` diff --git a/pittgoogle/bigquery.py b/pittgoogle/bigquery.py index 3cbe921..d7959f0 100644 --- a/pittgoogle/bigquery.py +++ b/pittgoogle/bigquery.py @@ -1,6 +1,7 @@ # -*- coding: UTF-8 -*- """Classes to facilitate connections to BigQuery datasets and tables.""" import logging +from typing import TYPE_CHECKING, Optional import attrs import google.cloud.bigquery @@ -8,12 +9,172 @@ from .alert import Alert from .auth import Auth +if TYPE_CHECKING: + import pandas as pd # always lazy-load pandas. it hogs memory on cloud functions and run + LOGGER = logging.getLogger(__name__) +@attrs.define +class Client: + """A client for interacting with Google BigQuery. + + ---- + """ + + _auth: Auth = attrs.field( + default=None, validator=attrs.validators.optional(attrs.validators.instance_of(Auth)) + ) + _client: google.cloud.bigquery.Client | None = attrs.field(default=None) + + def __getattr__(self, attr): + """If ``attr`` doesn't exist in this class, try getting it from the underlying ``google.cloud.bigquery.Client``. + + Raises: + AttributeError: + if ``attr`` doesn't exist in either the pittgoogle or google.cloud API. + """ + try: + return getattr(self.client, attr) + except AttributeError as excep: + msg = f"Neither 'pittgoogle.bigquery.Client' nor 'google.cloud.bigquery.Client' has attribute '{attr}'" + raise AttributeError(msg) from excep + + @property + def auth(self) -> Auth: + """Credentials for the Google Cloud project that this client will be connected to. + + This will be created using environment variables if necessary. + """ + if self._auth is None: + self._auth = Auth() + return self._auth + + @property + def client(self) -> google.cloud.bigquery.Client: + """Google Cloud BigQuery client. + + If the client has not been initialized yet, it will be created using :attr:`Client.auth`. + + Returns: + google.cloud.bigquery.Client: + An instance of the Google Cloud BigQuery client. + """ + if self._client is None: + self._client = google.cloud.bigquery.Client(credentials=self.auth.credentials) + return self._client + + def list_table_names(self, dataset: str, projectid: str | None = None) -> list[str]: + """Get the names of the tables in the dataset. + + Args: + dataset (str): + The name of the dataset. + projectid (str, optional): + The dataset owner's Google Cloud project ID. If None, + :attr:`Client.client.project` will be used. + + Returns: + list[str]: + A list of table names in the dataset. + + Example: + + .. code-block:: python + + bqclient = pittgoogle.bigquery.Client() + bqclient.list_table_names(dataset="ztf", projectid=pittgoogle.ProjectIds().pittgoogle) + """ + project = projectid or self.client.project + return sorted([tbl.table_id for tbl in self.client.list_tables(f"{project}.{dataset}")]) + + def query( + self, query: str, to_dataframe: bool = True, to_dataframe_kwargs: dict | None = None, **job_config_kwargs + ): + """Submit a BigQuery query job. + + Args: + query (str): + The SQL query to execute. + to_dataframe (bool, optional): + Whether to fetch the results and return them as a pandas DataFrame (True, default) or + just return the query job (False). + to_dataframe_kwargs (dict, optional): + Keyword arguments to be passed to ``google.cloud.bigquery.QueryJob.to_dataframe``. + Notable options: ``dtypes`` (dict), ``max_results`` (int), ``create_bqstorage_client`` (bool). + This is ignored unless ``to_dataframe`` is True. + ``create_bqstorage_client`` controls whether to use `google.cloud.bigquery_storage` (True) + or `google.cloud.bigquery` (False). `bigquery_storage` can be faster but is not necessary. + If you do not specify this parameter, pittgoogle will set it to True if the `bigquery_storage` + library is installed, else False. + **job_config_kwargs: + Keyword arguments to pass to the `google.cloud.bigquery.QueryJobConfig` constructor. + Notable option: ``dry_run`` (bool). + + Returns: + pandas.DataFrame if ``to_dataframe`` is True, else google.cloud.bigquery.QueryJob + + Example: + + Query two tables (ztf.alerts_v4_02 and ztf.alerts_v3_3) for data on one object (ZTF19acfixfe). + + .. code-block:: python + + bqclient = pittgoogle.bigquery.Client() + pittgoogle_project = pittgoogle.ProjectIds().pittgoogle + + sql = f\"\"\" + SELECT objectId, candid, candidate.jd, candidate.fid, candidate.magpsf + FROM `{pittgoogle_project}.ztf.alerts_v3_3` + WHERE objectId = 'ZTF19acfixfe' + UNION ALL + SELECT objectId, candid, candidate.jd, candidate.fid, candidate.magpsf + FROM `{pittgoogle_project}.ztf.alerts_v4_02` + WHERE objectId = 'ZTF19acfixfe' + \"\"\" + + diaobject_df = bqclient.query(query=sql) + """ + # Submit + job_config = google.cloud.bigquery.QueryJobConfig(**job_config_kwargs) + query_job = self.client.query(query, job_config=job_config) + + # Return + if job_config.dry_run: + print(f"This query will process {query_job.total_bytes_processed:,} bytes") + return query_job + + if to_dataframe: + kwargs = to_dataframe_kwargs.copy() if to_dataframe_kwargs else {} + # Google sets 'create_bqstorage_client' to True by default and then raises a warning if the + # 'bigquery_storage' library is not installed. Most pittgoogle users are not likely to have + # this installed or even know what it is. Let's avoid the warning and just quietly check for it. + create_bqstorage_client = self._check_bqstorage_client(kwargs.pop("create_bqstorage_client", None)) + return query_job.to_dataframe(create_bqstorage_client=create_bqstorage_client, **kwargs) + + return query_job + + @staticmethod + def _check_bqstorage_client(user_value: bool | None) -> bool: + """If ``user_value`` is None, check whether ``google.cloud.bigquery_storage`` is installed by trying to import it. + + Returns: + bool: + ``user_value`` if it is not None. Else, True (False) if the import is (is not) successful. + """ + if user_value is not None: + return user_value + + try: + import google.cloud.bigquery_storage # noqa: W0611 + except ModuleNotFoundError: + return False + return True + + @attrs.define class Table: - """Methods and properties for a BigQuery table. + """Methods and properties for interacting with a Google BigQuery table. Args: name (str): @@ -21,11 +182,7 @@ class Table: dataset (str): Name of the BigQuery dataset this table belongs to. projectid (str, optional): - The table owner's Google Cloud project ID. Either this or `auth` is required. Note: - :attr:`pittgoogle.utils.ProjectIds` is a registry containing Pitt-Google's project IDs. - auth (Auth, optional): - Credentials for the Google Cloud project that owns this table. - If not provided, it will be created from environment variables when needed. + The table owner's Google Cloud project ID. If not provided, the client's project ID will be used. client (google.cloud.bigquery.Client, optional): BigQuery client that will be used to access the table. If not provided, a new client will be created the first time it is requested. @@ -38,18 +195,12 @@ class Table: """Name of the BigQuery table.""" dataset: str = attrs.field() """Name of the BigQuery dataset this table belongs to.""" + client: Client | None = attrs.field(factory=Client) + """BigQuery client used to access the table.""" # The rest don't need string descriptions because they are explicitly defined as properties below. _projectid: str = attrs.field(default=None) - _auth: Auth = attrs.field( - default=None, validator=attrs.validators.optional(attrs.validators.instance_of(Auth)) - ) - _client: google.cloud.bigquery.Client | None = attrs.field( - default=None, - validator=attrs.validators.optional( - attrs.validators.instance_of(google.cloud.bigquery.Client) - ), - ) _table: google.cloud.bigquery.Table | None = attrs.field(default=None, init=False) + _schema: Optional["pd.DataFrame"] = attrs.field(default=None, init=False) @classmethod def from_cloud( @@ -60,9 +211,9 @@ def from_cloud( survey: str | None = None, testid: str | None = None, ): - """Create a `Table` object using a `client` with implicit credentials. + """Create a :class:`Table` object using a BigQuery client with implicit credentials. - Use this method when creating a `Table` object in code running in Google Cloud (e.g., + Use this method when creating a :class:`Table` object in code running in Google Cloud (e.g., in a Cloud Run module). The underlying Google APIs will automatically find your credentials. The table resource in Google BigQuery is expected to already exist. @@ -88,52 +239,50 @@ def from_cloud( Returns: Table: The `Table` object. - - Raises: - NotFound: - # [TODO] Track down specific error raised when table doesn't exist; update this docstring. """ if dataset is None: dataset = survey # if testid is not False, "False", or None, append it to the dataset if testid and testid != "False": dataset = f"{dataset}_{testid}" - client = google.cloud.bigquery.Client() - table = cls(name, dataset=dataset, projectid=client.project, client=client) - # make the get request now to create a connection to the table + # create a client with implicit credentials + client = Client(client=google.cloud.bigquery.Client()) + table = cls(name=name, dataset=dataset, projectid=client.project, client=client) + # make the get request now to fail early if there's a problem _ = table.table return table - @property - def auth(self) -> Auth: - """Credentials for the Google Cloud project that owns this table. + def __getattr__(self, attr): + """If ``attr`` doesn't exist in this class, try getting it from the underlying ``google.cloud.bigquery.Table``. - This will be created using environment variables if necessary. + Raises: + AttributeError: + if ``attr`` doesn't exist in either the pittgoogle or google.cloud API. """ - if self._auth is None: - self._auth = Auth() - - if (self._projectid != self._auth.GOOGLE_CLOUD_PROJECT) and (self._projectid is not None): - LOGGER.warning(f"setting projectid to match auth: {self._auth.GOOGLE_CLOUD_PROJECT}") - self._projectid = self._auth.GOOGLE_CLOUD_PROJECT - - return self._auth + try: + return getattr(self.table, attr) + except AttributeError as excep: + msg = f"Neither 'pittgoogle.bigquery.Table' nor 'google.cloud.bigquery.Table' has attribute '{attr}'" + raise AttributeError(msg) from excep @property def id(self) -> str: - """Fully qualified table ID with syntax 'projectid.dataset_name.table_name'.""" + """Fully qualified table ID with syntax 'projectid.dataset.name'.""" return f"{self.projectid}.{self.dataset}.{self.name}" @property def projectid(self) -> str: - """The table owner's Google Cloud project ID.""" + """The table owner's Google Cloud project ID. + + Defaults to :attr:`Table.client.client.project`. + """ if self._projectid is None: - self._projectid = self.auth.GOOGLE_CLOUD_PROJECT + self._projectid = self.client.client.project return self._projectid @property def table(self) -> google.cloud.bigquery.Table: - """Google Cloud BigQuery Table object that is connected to the Cloud resource. + """Google Cloud BigQuery Table object. Makes a `get_table` request if necessary. @@ -146,21 +295,29 @@ def table(self) -> google.cloud.bigquery.Table: return self._table @property - def client(self) -> google.cloud.bigquery.Client: - """Google Cloud BigQuery Client used to access the table. + def schema(self) -> "pd.DataFrame": + """Schema of the BigQuery table.""" + if self._schema is None: + # [TODO] Wondering, should we avoid pandas here? Maybe make this a dict instead? + import pandas as pd # always lazy-load pandas. it hogs memory on cloud functions and run - This will be created using :attr:`Table.auth` if necessary. + fields = [] + for field in self.table.schema: + fld = field.to_api_repr() # dict - Returns: - google.cloud.bigquery.Client: - The BigQuery client instance. - """ - if self._client is None: - self._client = google.cloud.bigquery.Client(credentials=self.auth.credentials) - return self._client + child_fields = fld.pop("fields", []) + # Append parent field name so that the child field name has the syntax 'parent_name.child_name'. + # This is the syntax that should be used in SQL queries and also the one shown on BigQuery Console page. + # The dicts update in place. + _ = [cfld.update(name=f"{fld['name']}.{cfld['name']}") for cfld in child_fields] + + fields.extend([fld] + child_fields) + self._schema = pd.DataFrame(fields) + + return self._schema def insert_rows(self, rows: list[dict | Alert]) -> list[dict]: - """Inserts the rows into the BigQuery table. + """Insert rows into the BigQuery table. Args: rows (list[dict or Alert]): @@ -176,3 +333,68 @@ def insert_rows(self, rows: list[dict | Alert]) -> list[dict]: if len(errors) > 0: LOGGER.warning(f"BigQuery insert error: {errors}") return errors + + def query( + self, + *, + columns: list[str] | None = None, + where: str | None = None, + limit: int | str | None = None, + to_dataframe: bool = True, + dry_run: bool = False, + return_sql: bool = False, + ): + """Submit a BigQuery query job. Against this table. + + This method supports basic queries against this table. For more complex queries or queries + against multiple tables, use :attr:`Client.query`. + + Args: + columns (list[str], optional): + List of columns to select. If None, all columns are selected. + where (str, optional): + SQL WHERE clause. + limit (int or str, optional): + Maximum number of rows to return. + to_dataframe (bool, optional): + Whether to fetch the results and return them as a pandas DataFrame (True, default) or + just return the query job (False). + dry_run (bool, optional): + Whether to do a dry-run only to check whether the query is valid and estimate costs. + return_sql (bool, optional): + If True, the SQL query string will be returned. The query job will not be submitted. + + Returns: + pandas.DataFrame, google.cloud.bigquery.QueryJob, or str: + The SQL query string if ``return_sql`` is True. Otherwise, the results in a DataFrame + if ``to_dataframe`` is True, else the query job. + + Example: + + .. code-block:: python + + alerts_tbl = pittgoogle.Table( + name="alerts_v4_02", dataset="ztf", projectid=pittgoogle.ProjectIds().pittgoogle + ) + columns = ["objectId", "candid", "candidate.jd", "candidate.fid", "candidate.magpsf"] + where = "objectId IN ('ZTF18aarunfu', 'ZTF24aavyicb', 'ZTF24aavzkuf')" + + diaobjects_df = alerts_tbl.query(columns=columns, where=where) + """ + # We could use parameterized queries, but accounting for all input possibilities would take a good amount of + # work which should not be necessary. This query will be executed with the user's credentials/permissions. + # No special access is added here. The user can already submit arbitrary SQL queries using 'Table.client.query', + # so there's no point in trying to protect against SQL injection here. + + # Construct the SQL statement + sql = f"SELECT {', '.join(columns) if columns else '*'}" + sql += f" FROM `{self.table.full_table_id.replace(':', '.')}`" + if where is not None: + sql += f" WHERE {where}" + if limit is not None: + sql += f" LIMIT {limit}" + if return_sql: + return sql + + # Do the query + return self.client.query(query=sql, dry_run=dry_run, to_dataframe=to_dataframe) diff --git a/pittgoogle/exceptions.py b/pittgoogle/exceptions.py index fe3f909..653308a 100644 --- a/pittgoogle/exceptions.py +++ b/pittgoogle/exceptions.py @@ -4,8 +4,8 @@ class BadRequest(Exception): class CloudConnectionError(Exception): - """Raised when a problem is encountered while trying to a Google Cloud resource.""" + """Raised when a problem is encountered while trying to connect to a Google Cloud resource.""" class SchemaError(Exception): - """Raised when a schema with a given name is not found in the registry.""" + """Raised when the schema cannot be found in the registry or is incompatible with the data.""" diff --git a/pittgoogle/pubsub.py b/pittgoogle/pubsub.py index 300815d..abb0560 100644 --- a/pittgoogle/pubsub.py +++ b/pittgoogle/pubsub.py @@ -1,12 +1,5 @@ # -*- coding: UTF-8 -*- -"""Classes to facilitate connections to Google Cloud Pub/Sub streams. - -.. note:: - - This module relies on :mod:`pittgoogle.auth` to authenticate API calls. - The examples given below assume the use of a :ref:`service account ` and - :ref:`environment variables `. -""" +"""Classes to facilitate connections to Google Cloud Pub/Sub streams.""" import concurrent.futures import datetime import importlib.resources @@ -49,7 +42,8 @@ def pull_batch( Args: subscription (str or Subscription): - The subscription to be pulled. If str, the name of the subscription. + The subscription to be pulled. If str, the name of the subscription. The subscription is + expected to exist in Google Cloud. max_messages (int): The maximum number of messages to be pulled. schema_name (str): @@ -57,7 +51,7 @@ def pull_batch( for the list of options. Passed to Alert for unpacking. If not provided, some properties of the Alert may not be available. **subscription_kwargs: - Keyword arguments used to create the :class:`pittgoogle.pubsub.Subscription` object, if needed. + Keyword arguments used to create the :class:`Subscription` object, if needed. Returns: list[Alert]: @@ -70,9 +64,9 @@ def pull_batch( response = subscription.client.pull( {"subscription": subscription.path, "max_messages": max_messages} ) - except google.api_core.exceptions.NotFound: - msg = "Subscription not found. You may need to create one using `pittgoogle.Subscription.touch`." - raise exceptions.CloudConnectionError(msg) + except google.api_core.exceptions.NotFound as excep: + msg = f"NotFound: {subscription.path}. You may need to create the subscription using `pittgoogle.Subscription.touch`." + raise exceptions.CloudConnectionError(msg) from excep alerts = [ Alert.from_msg(msg.message, schema_name=schema_name) for msg in response.received_messages @@ -93,8 +87,8 @@ class Topic: name (str): Name of the Pub/Sub topic. projectid (str, optional): - The topic owner's Google Cloud project ID. Either this or `auth` is required. Use this - if you are connecting to a subscription owned by a different project than this topic. Note: + The topic owner's Google Cloud project ID. Either this or ``auth`` is required. Use this + if you are connecting to a subscription owned by a different project than this topic. :class:`pittgoogle.registry.ProjectIds` is a registry containing Pitt-Google's project IDs. auth (Auth, optional): Credentials for the Google Cloud project that owns this topic. If not provided, @@ -107,8 +101,6 @@ class Topic: .. code-block:: python - import pittgoogle - # Create a new topic in your project my_topic = pittgoogle.Topic(name="my-new-topic") my_topic.touch() @@ -148,11 +140,11 @@ def from_cloud( survey: Optional[str] = None, testid: Optional[str] = None, ): - """Creates a `Topic` with a `client` using implicit credentials (no explicit `auth`). + """Creates a :class:`Topic` with a :attr:`Topic.client` that uses implicit credentials. Args: name (str): - Name of the topic. If `survey` and/or `testid` are provided, they will be added to this + Name of the topic. If ``survey`` and/or ``testid`` are provided, they will be added to this name following the Pitt-Google naming syntax. projectid (str): Project ID of the Google Cloud project that owns this resource. Project IDs used by @@ -162,8 +154,8 @@ def from_cloud( Name of the survey. If provided, it will be prepended to `name` following the Pitt-Google naming syntax. testid (str, optional): - Pipeline identifier. If this is not `None`, `False`, or `"False"`, it will be appended to - the `name` following the Pitt-Google naming syntax. This is used to allow pipeline modules + Pipeline identifier. If this is not None, False, or "False", it will be appended to + the ``name`` following the Pitt-Google naming syntax. This is used to allow pipeline modules to find the correct resources without interfering with other pipelines that may have deployed resources with the same base names (e.g., for development and testing purposes). """ @@ -177,7 +169,7 @@ def from_cloud( @classmethod def from_path(cls, path) -> "Topic": - """Parse the `path` and return a new `Topic`.""" + """Parse the ``path`` and return a new :class:`Topic`.""" _, projectid, _, name = path.split("/") return cls(name, projectid) @@ -189,11 +181,6 @@ def auth(self) -> Auth: """ if self._auth is None: self._auth = Auth() - - if (self._projectid != self._auth.GOOGLE_CLOUD_PROJECT) and (self._projectid is not None): - LOGGER.warning(f"setting projectid to match auth: {self._auth.GOOGLE_CLOUD_PROJECT}") - self._projectid = self._auth.GOOGLE_CLOUD_PROJECT - return self._auth @property @@ -221,14 +208,51 @@ def client(self) -> google.cloud.pubsub_v1.PublisherClient: return self._client def touch(self) -> None: - """Test the connection to the topic, creating it if necessary.""" + """Test the connection to the topic, creating it if necessary. + + .. tip: This is only necessary if you need to interact with the topic directly to do things like + *publish* messages. In particular, this is *not* necessary if you are trying to *pull* messages. + All users can create a subscription to a Pitt-Google topic and pull messages from it, even + if they can't actually touch the topic. + + Raises: + CloudConnectionError: + 'PermissionDenied' if :attr:`Topic.auth` does not have permission to get or create the table. + """ try: + # Check if topic exists and we can connect. self.client.get_topic(topic=self.path) LOGGER.info(f"topic exists: {self.path}") except google.api_core.exceptions.NotFound: - self.client.create_topic(name=self.path) - LOGGER.info(f"topic created: {self.path}") + try: + # Try to create a new topic. + self.client.create_topic(name=self.path) + LOGGER.info(f"topic created: {self.path}") + + except google.api_core.exceptions.PermissionDenied as excep: + # User has access to this topic's project but insufficient permissions to create a new topic. + # Assume this is a simple IAM problem rather than the user being confused about when + # to call this method (as can happen below). + msg = ( + "PermissionDenied: You seem to have appropriate IAM permissions to get topics " + "in this project but not to create them." + ) + raise exceptions.CloudConnectionError(msg) from excep + + except google.api_core.exceptions.PermissionDenied as excep: + # User does not have permission to get this topic. + # This is not a problem if they only want to subscribe, but can be confusing. + # [TODO] Maybe users should just be allowed to get the topic? + msg = ( + f"PermissionDenied: The provided `pittgoogle.Auth` cannot get topic {self.path}. " + "Either the provided Auth has a different project ID, or your credentials just don't " + "have appropriate IAM permissions. \nNote that if you are a user trying to connect to " + "a Pitt-Google topic, your Auth is _expected_ to have a different project ID and you " + "can safely ignore this error (and avoid running `Topic.touch` in the future). " + "It does not impact your ability to attach a subscription and pull messages." + ) + raise exceptions.CloudConnectionError(msg) from excep def delete(self) -> None: """Delete the topic.""" @@ -240,7 +264,8 @@ def delete(self) -> None: LOGGER.info(f"deleted topic: {self.path}") def publish(self, alert: "Alert") -> int: - """Publish a message with ``alert.dict`` as the payload and ``alert.attributes`` as the attributes.""" + """Publish a message with :attr:`pittgoogle.Alert.dict` as the payload and + :attr:`pittgoogle.Alert.attributes` as the attributes.""" # Pub/Sub requires attribute keys and values to be strings. Sort the keys while we're at it. attributes = {str(key): str(alert.attributes[key]) for key in sorted(alert.attributes)} message = alert.schema.serialize(alert.dict) @@ -257,13 +282,13 @@ class Subscription: name (str): Name of the Pub/Sub subscription. auth (Auth, optional): - Credentials for the Google Cloud project that owns this subscription. If not provided, it will be created - from environment variables. + Credentials for the Google Cloud project that will be used to connect to the subscription. + If not provided, it will be created from environment variables. topic (Topic, optional): Topic this subscription should be attached to. Required only when the subscription needs to be created. client (google.cloud.pubsub_v1.SubscriberClient, optional): - Pub/Sub client that will be used to access the subscription. This kwarg is useful if you want to - reuse a client. If None, a new client will be created. + Pub/Sub client that will be used to access the subscription. + If not provided, a new client will be created the first time it is needed. schema_name (str): Schema name of the alerts in the subscription. Passed to :class:`pittgoogle.alert.Alert` for unpacking. If not provided, some properties of the Alert may not be available. For a list of schema names, see @@ -271,30 +296,21 @@ class Subscription: Example: - Create a subscription to the "ztf-loop" topic: + Create a subscription to Pitt-Google's 'ztf-loop' topic and pull messages: .. code-block:: python - # We must provide the topic that the subscription should be connected to. - # If the topic is in your own project, you can just provide the name of the topic. - # Otherwise, you must also provide the topic's project ID. - topic = pittgoogle.Topic(name="ztf-loop", projectid=pittgoogle.ProjectIds.pittgoogle) - topic.touch() # make sure the topic exists and we can connect to it - - # You can choose your own name for the subscription. - # It is common to name it the same as the topic, but here we'll be more verbose. - # You may also need to provide the schema name (see :attr:`pittgoogle.registry.Schemas.names`). - subscription = pittgoogle.Subscription(name="my-ztf-loop-subscription", topic=topic, schema_name="ztf") + # Topic that the subscription should be connected to + topic = pittgoogle.Topic(name="ztf-loop", projectid=pittgoogle.ProjectIds().pittgoogle) - # Create the subscription if it doesn't already exist. + # Create the subscription + subscription = pittgoogle.Subscription( + name="my-ztf-loop-subscription", topic=topic, schema_name="ztf" + ) subscription.touch() - Pull a small batch of alerts. Helpful for testing. (Not recommended for long-running listeners; - use :class:`pittgoogle.pubsub.Consumer` instead.) - - .. code-block:: python - - alerts = subscription.pull_batch(subscription, max_messages=4) # list of pittgoogle.Alert objects + # Pull a small batch of alerts + alerts = subscription.pull_batch(subscription, max_messages=4) ---- """ @@ -344,18 +360,18 @@ def touch(self) -> None: TypeError: if the subscription needs to be created but no topic was provided. - google.api_core.exceptions.NotFound: - if the subscription needs to be created but the topic does not exist in Google Cloud. - CloudConnectionError: - if the subscription exists but it is not attached to self.topic and self.topic is not None. + + - 'NotFound` if the subscription needs to be created but the topic does not exist in Google Cloud. + - 'InvalidTopic' if the subscription exists but the user explicitly provided a topic that + this subscription is not actually attached to. """ try: subscrip = self.client.get_subscription(subscription=self.path) LOGGER.info(f"subscription exists: {self.path}") except google.api_core.exceptions.NotFound: - subscrip = self._create() # may raise TypeError or (topic) NotFound + subscrip = self._create() # may raise TypeError or CloudConnectionError LOGGER.info(f"subscription created: {self.path}") self._set_topic(subscrip.topic) # may raise CloudConnectionError @@ -369,17 +385,17 @@ def _create(self) -> google.cloud.pubsub_v1.types.Subscription: # this error message is not very clear. let's help. except google.api_core.exceptions.NotFound as excep: - msg = f"The subscription cannot be created because the topic does not exist: {self.topic.path}" - raise google.api_core.exceptions.NotFound(msg) from excep + msg = f"NotFound: The subscription cannot be created because the topic does not exist: {self.topic.path}" + raise exceptions.CloudConnectionError(msg) from excep def _set_topic(self, connected_topic_path) -> None: # if the topic is invalid, raise an error if (self.topic is not None) and (connected_topic_path != self.topic.path): msg = ( - "The subscription exists but is attached to a different topic.\n" + "InvalidTopic: The subscription exists but is attached to a different topic.\n" f"\tFound topic: {connected_topic_path}\n" f"\tExpected topic: {self.topic.path}\n" - "Either point to the found topic or delete the existing subscription and try again." + "Either use the found topic or delete the existing subscription and try again." ) raise exceptions.CloudConnectionError(msg) @@ -403,8 +419,9 @@ def pull_batch(self, max_messages: int = 1) -> List["Alert"]: This method is recommended for use cases that need a small number of alerts on-demand, often for testing and development. - This method is *not* recommended for long-running listeners as it is likely to be unstable - -- use :meth:`~Consumer.stream` instead. + This method is *not* recommended for long-running listeners as it is likely to be unstable. + Use :meth:`Consumer.stream` instead. This is Google's recommendation about how to use the + Google API that underpins these pittgoogle methods. Args: max_messages (int): @@ -649,7 +666,7 @@ def pull_batch(self, max_messages: int = 1) -> List["Alert"]: @attrs.define(kw_only=True, frozen=True) class Response: - """Container for a response, to be returned by a :meth:`pittgoogle.pubsub.Consumer.msg_callback`. + """Container for a response, to be returned by a :meth:`Consumer.msg_callback`. Args: ack (bool): diff --git a/pittgoogle/schema.py b/pittgoogle/schema.py index 9402c97..6532c16 100644 --- a/pittgoogle/schema.py +++ b/pittgoogle/schema.py @@ -103,22 +103,22 @@ def lsst_auto_schema_helper(schema_dict: dict) -> "Schema": # Parse major and minor versions out of schema.name. Expecting syntax "lsst.v_.alert". try: major, minor = map(int, re.findall(r"\d+", schema.name)) - except ValueError: + except ValueError as excep: msg = ( f"Unable to identify major and minor version. Please use the syntax " "'lsst.v_.alert', replacing '' and '' with integers. " f"{version_msg}" ) - raise exceptions.SchemaError(msg) + raise exceptions.SchemaError(msg) from excep schema_dir = Path(lsst.alert.packet.schema.get_schema_path(major, minor)) schema.path = schema_dir / f"{schema.name}.avsc" try: schema.definition = lsst.alert.packet.schema.Schema.from_file(schema.path).definition - except fastavro.repository.SchemaRepositoryError: + except fastavro.repository.SchemaRepositoryError as excep: msg = f"Unable to load the schema. {version_msg}" - raise exceptions.SchemaError(msg) + raise exceptions.SchemaError(msg) from excep return schema @@ -232,8 +232,8 @@ def deserialize(self, alert_bytes: bytes) -> dict: except Exception: try: return utils.Cast.json_to_dict(alert_bytes) - except Exception: - raise exceptions.SchemaError("Failed to deserialize the alert bytes") + except Exception as excep: + raise exceptions.SchemaError("Failed to deserialize the alert bytes") from excep class _SchemalessAvroSchema(Schema): diff --git a/poetry.lock b/poetry.lock index 20bcb40..0d4a8c2 100644 --- a/poetry.lock +++ b/poetry.lock @@ -307,6 +307,23 @@ files = [ [package.extras] toml = ["tomli"] +[[package]] +name = "db-dtypes" +version = "1.2.0" +description = "Pandas Data Types for SQL systems (BigQuery, Spanner)" +optional = false +python-versions = ">=3.7" +files = [ + {file = "db-dtypes-1.2.0.tar.gz", hash = "sha256:3531bb1fb8b5fbab33121fe243ccc2ade16ab2524f4c113b05cc702a1908e6ea"}, + {file = "db_dtypes-1.2.0-py2.py3-none-any.whl", hash = "sha256:6320bddd31d096447ef749224d64aab00972ed20e4392d86f7d8b81ad79f7ff0"}, +] + +[package.dependencies] +numpy = ">=1.16.6" +packaging = ">=17.0" +pandas = ">=0.24.2" +pyarrow = ">=3.0.0" + [[package]] name = "docutils" version = "0.20.1" @@ -379,12 +396,12 @@ files = [ google-auth = ">=2.14.1,<3.0.dev0" googleapis-common-protos = ">=1.56.2,<2.0.dev0" grpcio = [ - {version = ">=1.49.1,<2.0dev", optional = true, markers = "python_version >= \"3.11\" and extra == \"grpc\""}, {version = ">=1.33.2,<2.0dev", optional = true, markers = "python_version < \"3.11\" and extra == \"grpc\""}, + {version = ">=1.49.1,<2.0dev", optional = true, markers = "python_version >= \"3.11\" and extra == \"grpc\""}, ] grpcio-status = [ - {version = ">=1.49.1,<2.0.dev0", optional = true, markers = "python_version >= \"3.11\" and extra == \"grpc\""}, {version = ">=1.33.2,<2.0.dev0", optional = true, markers = "python_version < \"3.11\" and extra == \"grpc\""}, + {version = ">=1.49.1,<2.0.dev0", optional = true, markers = "python_version >= \"3.11\" and extra == \"grpc\""}, ] proto-plus = ">=1.22.3,<2.0.0dev" protobuf = ">=3.19.5,<3.20.0 || >3.20.0,<3.20.1 || >3.20.1,<4.21.0 || >4.21.0,<4.21.1 || >4.21.1,<4.21.2 || >4.21.2,<4.21.3 || >4.21.3,<4.21.4 || >4.21.4,<4.21.5 || >4.21.5,<6.0.0.dev0" @@ -503,8 +520,8 @@ grpc-google-iam-v1 = ">=0.12.4,<1.0.0dev" grpcio = ">=1.51.3,<2.0dev" grpcio-status = ">=1.33.2" proto-plus = [ - {version = ">=1.22.2,<2.0.0dev", markers = "python_version >= \"3.11\""}, {version = ">=1.22.0,<2.0.0dev", markers = "python_version < \"3.11\""}, + {version = ">=1.22.2,<2.0.0dev", markers = "python_version >= \"3.11\""}, ] protobuf = ">=3.20.2,<4.21.0 || >4.21.0,<4.21.1 || >4.21.1,<4.21.2 || >4.21.2,<4.21.3 || >4.21.3,<4.21.4 || >4.21.4,<4.21.5 || >4.21.5,<6.0.0dev" @@ -1036,9 +1053,9 @@ files = [ [package.dependencies] numpy = [ + {version = ">=1.22.4", markers = "python_version < \"3.11\""}, {version = ">=1.23.2", markers = "python_version == \"3.11\""}, {version = ">=1.26.0", markers = "python_version >= \"3.12\""}, - {version = ">=1.22.4", markers = "python_version < \"3.11\""}, ] python-dateutil = ">=2.8.2" pytz = ">=2020.1" @@ -1106,6 +1123,57 @@ files = [ {file = "protobuf-5.27.2.tar.gz", hash = "sha256:f3ecdef226b9af856075f28227ff2c90ce3a594d092c39bee5513573f25e2714"}, ] +[[package]] +name = "pyarrow" +version = "17.0.0" +description = "Python library for Apache Arrow" +optional = false +python-versions = ">=3.8" +files = [ + {file = "pyarrow-17.0.0-cp310-cp310-macosx_10_15_x86_64.whl", hash = "sha256:a5c8b238d47e48812ee577ee20c9a2779e6a5904f1708ae240f53ecbee7c9f07"}, + {file = "pyarrow-17.0.0-cp310-cp310-macosx_11_0_arm64.whl", hash = "sha256:db023dc4c6cae1015de9e198d41250688383c3f9af8f565370ab2b4cb5f62655"}, + {file = "pyarrow-17.0.0-cp310-cp310-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:da1e060b3876faa11cee287839f9cc7cdc00649f475714b8680a05fd9071d545"}, + {file = "pyarrow-17.0.0-cp310-cp310-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:75c06d4624c0ad6674364bb46ef38c3132768139ddec1c56582dbac54f2663e2"}, + {file = "pyarrow-17.0.0-cp310-cp310-manylinux_2_28_aarch64.whl", hash = "sha256:fa3c246cc58cb5a4a5cb407a18f193354ea47dd0648194e6265bd24177982fe8"}, + {file = "pyarrow-17.0.0-cp310-cp310-manylinux_2_28_x86_64.whl", hash = "sha256:f7ae2de664e0b158d1607699a16a488de3d008ba99b3a7aa5de1cbc13574d047"}, + {file = "pyarrow-17.0.0-cp310-cp310-win_amd64.whl", hash = "sha256:5984f416552eea15fd9cee03da53542bf4cddaef5afecefb9aa8d1010c335087"}, + {file = "pyarrow-17.0.0-cp311-cp311-macosx_10_15_x86_64.whl", hash = "sha256:1c8856e2ef09eb87ecf937104aacfa0708f22dfeb039c363ec99735190ffb977"}, + {file = "pyarrow-17.0.0-cp311-cp311-macosx_11_0_arm64.whl", hash = "sha256:2e19f569567efcbbd42084e87f948778eb371d308e137a0f97afe19bb860ccb3"}, + {file = "pyarrow-17.0.0-cp311-cp311-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:6b244dc8e08a23b3e352899a006a26ae7b4d0da7bb636872fa8f5884e70acf15"}, + {file = "pyarrow-17.0.0-cp311-cp311-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:0b72e87fe3e1db343995562f7fff8aee354b55ee83d13afba65400c178ab2597"}, + {file = "pyarrow-17.0.0-cp311-cp311-manylinux_2_28_aarch64.whl", hash = "sha256:dc5c31c37409dfbc5d014047817cb4ccd8c1ea25d19576acf1a001fe07f5b420"}, + {file = "pyarrow-17.0.0-cp311-cp311-manylinux_2_28_x86_64.whl", hash = "sha256:e3343cb1e88bc2ea605986d4b94948716edc7a8d14afd4e2c097232f729758b4"}, + {file = "pyarrow-17.0.0-cp311-cp311-win_amd64.whl", hash = "sha256:a27532c38f3de9eb3e90ecab63dfda948a8ca859a66e3a47f5f42d1e403c4d03"}, + {file = "pyarrow-17.0.0-cp312-cp312-macosx_10_15_x86_64.whl", hash = "sha256:9b8a823cea605221e61f34859dcc03207e52e409ccf6354634143e23af7c8d22"}, + {file = "pyarrow-17.0.0-cp312-cp312-macosx_11_0_arm64.whl", hash = "sha256:f1e70de6cb5790a50b01d2b686d54aaf73da01266850b05e3af2a1bc89e16053"}, + {file = "pyarrow-17.0.0-cp312-cp312-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:0071ce35788c6f9077ff9ecba4858108eebe2ea5a3f7cf2cf55ebc1dbc6ee24a"}, + {file = "pyarrow-17.0.0-cp312-cp312-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:757074882f844411fcca735e39aae74248a1531367a7c80799b4266390ae51cc"}, + {file = "pyarrow-17.0.0-cp312-cp312-manylinux_2_28_aarch64.whl", hash = "sha256:9ba11c4f16976e89146781a83833df7f82077cdab7dc6232c897789343f7891a"}, + {file = "pyarrow-17.0.0-cp312-cp312-manylinux_2_28_x86_64.whl", hash = "sha256:b0c6ac301093b42d34410b187bba560b17c0330f64907bfa4f7f7f2444b0cf9b"}, + {file = "pyarrow-17.0.0-cp312-cp312-win_amd64.whl", hash = "sha256:392bc9feabc647338e6c89267635e111d71edad5fcffba204425a7c8d13610d7"}, + {file = "pyarrow-17.0.0-cp38-cp38-macosx_10_15_x86_64.whl", hash = "sha256:af5ff82a04b2171415f1410cff7ebb79861afc5dae50be73ce06d6e870615204"}, + {file = "pyarrow-17.0.0-cp38-cp38-macosx_11_0_arm64.whl", hash = "sha256:edca18eaca89cd6382dfbcff3dd2d87633433043650c07375d095cd3517561d8"}, + {file = "pyarrow-17.0.0-cp38-cp38-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:7c7916bff914ac5d4a8fe25b7a25e432ff921e72f6f2b7547d1e325c1ad9d155"}, + {file = "pyarrow-17.0.0-cp38-cp38-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:f553ca691b9e94b202ff741bdd40f6ccb70cdd5fbf65c187af132f1317de6145"}, + {file = "pyarrow-17.0.0-cp38-cp38-manylinux_2_28_aarch64.whl", hash = "sha256:0cdb0e627c86c373205a2f94a510ac4376fdc523f8bb36beab2e7f204416163c"}, + {file = "pyarrow-17.0.0-cp38-cp38-manylinux_2_28_x86_64.whl", hash = "sha256:d7d192305d9d8bc9082d10f361fc70a73590a4c65cf31c3e6926cd72b76bc35c"}, + {file = "pyarrow-17.0.0-cp38-cp38-win_amd64.whl", hash = "sha256:02dae06ce212d8b3244dd3e7d12d9c4d3046945a5933d28026598e9dbbda1fca"}, + {file = "pyarrow-17.0.0-cp39-cp39-macosx_10_15_x86_64.whl", hash = "sha256:13d7a460b412f31e4c0efa1148e1d29bdf18ad1411eb6757d38f8fbdcc8645fb"}, + {file = "pyarrow-17.0.0-cp39-cp39-macosx_11_0_arm64.whl", hash = "sha256:9b564a51fbccfab5a04a80453e5ac6c9954a9c5ef2890d1bcf63741909c3f8df"}, + {file = "pyarrow-17.0.0-cp39-cp39-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:32503827abbc5aadedfa235f5ece8c4f8f8b0a3cf01066bc8d29de7539532687"}, + {file = "pyarrow-17.0.0-cp39-cp39-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:a155acc7f154b9ffcc85497509bcd0d43efb80d6f733b0dc3bb14e281f131c8b"}, + {file = "pyarrow-17.0.0-cp39-cp39-manylinux_2_28_aarch64.whl", hash = "sha256:dec8d129254d0188a49f8a1fc99e0560dc1b85f60af729f47de4046015f9b0a5"}, + {file = "pyarrow-17.0.0-cp39-cp39-manylinux_2_28_x86_64.whl", hash = "sha256:a48ddf5c3c6a6c505904545c25a4ae13646ae1f8ba703c4df4a1bfe4f4006bda"}, + {file = "pyarrow-17.0.0-cp39-cp39-win_amd64.whl", hash = "sha256:42bf93249a083aca230ba7e2786c5f673507fa97bbd9725a1e2754715151a204"}, + {file = "pyarrow-17.0.0.tar.gz", hash = "sha256:4beca9521ed2c0921c1023e68d097d0299b62c362639ea315572a58f3f50fd28"}, +] + +[package.dependencies] +numpy = ">=1.16.6" + +[package.extras] +test = ["cffi", "hypothesis", "pandas", "pytest", "pytz"] + [[package]] name = "pyasn1" version = "0.6.0" @@ -1583,4 +1651,4 @@ test = ["big-O", "importlib-resources", "jaraco.functools", "jaraco.itertools", [metadata] lock-version = "2.0" python-versions = "^3.9" -content-hash = "8cca48c9ce56987e503a2f237998de4c9e2456e48bbbebcf60b40ec7d7ea0f3b" +content-hash = "2422a454474d7a4791b1e00297b4759d31bf92d0e10ddf396d61cc26c767e266" diff --git a/pyproject.toml b/pyproject.toml index 5cea334..93e31c7 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -31,6 +31,7 @@ classifiers = [ python = "^3.9" attrs = ">=23.1" astropy = ">=5.3" # v<6.0.0 required by supernnova 3.0.1 +db-dtypes = "^1.2.0" # Needed by google-cloud-bigquery. Poetry added this as "^"; leaving for now. fastavro = ">=1.7.4" google-auth-oauthlib = ">=1.0" google-cloud-bigquery = ">=3.11.2"