From d69966074af5f5edf9ceb94ad8dc5d2be8829c5c Mon Sep 17 00:00:00 2001 From: sagar-salvi-apptware <159135491+sagar-salvi-apptware@users.noreply.github.com> Date: Fri, 14 Jun 2024 16:43:12 +0530 Subject: [PATCH] fix(ingest/bigquery): Map BigQuery policy tags to datahub column-level tags (#10669) --- .../docs/sources/bigquery/bigquery_pre.md | 28 ++++--- .../recipes/bigquery_to_datahub.dhub.yaml | 1 + metadata-ingestion/setup.py | 1 + .../ingestion/source/bigquery_v2/bigquery.py | 21 ++++- .../source/bigquery_v2/bigquery_config.py | 15 +++- .../source/bigquery_v2/bigquery_schema.py | 81 ++++++++++++++++++- .../bigquery_v2/bigquery_mcp_golden.json | 22 ++++- .../integration/bigquery_v2/test_bigquery.py | 3 + 8 files changed, 153 insertions(+), 19 deletions(-) diff --git a/metadata-ingestion/docs/sources/bigquery/bigquery_pre.md b/metadata-ingestion/docs/sources/bigquery/bigquery_pre.md index 0d856b915629d..d6efe9334f756 100644 --- a/metadata-ingestion/docs/sources/bigquery/bigquery_pre.md +++ b/metadata-ingestion/docs/sources/bigquery/bigquery_pre.md @@ -28,19 +28,21 @@ There are two important concepts to understand and identify: If you have multiple projects in your BigQuery setup, the role should be granted these permissions in each of the projects. ::: -| permission                       | Description                                                                                                 | Capability               | Default GCP role which contains this permission                                                                 | -|----------------------------------|--------------------------------------------------------------------------------------------------------------|-------------------------------------|-----------------------------------------------------------------------------------------------------------------| -| `bigquery.datasets.get`         | Retrieve metadata about a dataset.                                                                           | Table Metadata Extraction           | [roles/bigquery.metadataViewer](https://cloud.google.com/bigquery/docs/access-control#bigquery.metadataViewer) | -| `bigquery.datasets.getIamPolicy` | Read a dataset's IAM permissions.                                                                           | Table Metadata Extraction           | [roles/bigquery.metadataViewer](https://cloud.google.com/bigquery/docs/access-control#bigquery.metadataViewer) | -| `bigquery.tables.list`           | List BigQuery tables.                                                                                       | Table Metadata Extraction           | [roles/bigquery.metadataViewer](https://cloud.google.com/bigquery/docs/access-control#bigquery.metadataViewer) | -| `bigquery.tables.get`           | Retrieve metadata for a table.                                                                               | Table Metadata Extraction           | [roles/bigquery.metadataViewer](https://cloud.google.com/bigquery/docs/access-control#bigquery.metadataViewer) | -| `bigquery.routines.get`           | Get Routines. Needs to retrieve metadata for a table from system table.                                                                                       | Table Metadata Extraction           | [roles/bigquery.metadataViewer](https://cloud.google.com/bigquery/docs/access-control#bigquery.metadataViewer) | -| `bigquery.routines.list`           | List Routines. Needs to retrieve metadata for a table from system table                                                                               | Table Metadata Extraction           | [roles/bigquery.metadataViewer](https://cloud.google.com/bigquery/docs/access-control#bigquery.metadataViewer) | -| `resourcemanager.projects.get`   | Retrieve project names and metadata.                                                                         | Table Metadata Extraction           | [roles/bigquery.metadataViewer](https://cloud.google.com/bigquery/docs/access-control#bigquery.metadataViewer) | -| `bigquery.jobs.listAll`         | List all jobs (queries) submitted by any user. Needs for Lineage extraction.                                 | Lineage Extraction/Usage extraction | [roles/bigquery.resourceViewer](https://cloud.google.com/bigquery/docs/access-control#bigquery.resourceViewer) | -| `logging.logEntries.list`       | Fetch log entries for lineage/usage data. Not required if `use_exported_bigquery_audit_metadata` is enabled. | Lineage Extraction/Usage extraction | [roles/logging.privateLogViewer](https://cloud.google.com/logging/docs/access-control#logging.privateLogViewer) | -| `logging.privateLogEntries.list` | Fetch log entries for lineage/usage data. Not required if `use_exported_bigquery_audit_metadata` is enabled. | Lineage Extraction/Usage extraction | [roles/logging.privateLogViewer](https://cloud.google.com/logging/docs/access-control#logging.privateLogViewer) | -| `bigquery.tables.getData`       | Access table data to extract storage size, last updated at, data profiles etc. | Profiling                           |                                                                                                                 | +| Permission | Description | Capability | Default GCP Role Which Contains This Permission | +|----------------------------------|-----------------------------------------------------------------------------------------------------------------|-------------------------------------|---------------------------------------------------------------------------| +| `bigquery.datasets.get` | Retrieve metadata about a dataset. | Table Metadata Extraction | [roles/bigquery.metadataViewer](https://cloud.google.com/bigquery/docs/access-control#bigquery.metadataViewer) | +| `bigquery.datasets.getIamPolicy` | Read a dataset's IAM permissions. | Table Metadata Extraction | [roles/bigquery.metadataViewer](https://cloud.google.com/bigquery/docs/access-control#bigquery.metadataViewer) | +| `bigquery.tables.list` | List BigQuery tables. | Table Metadata Extraction | [roles/bigquery.metadataViewer](https://cloud.google.com/bigquery/docs/access-control#bigquery.metadataViewer) | +| `bigquery.tables.get` | Retrieve metadata for a table. | Table Metadata Extraction | [roles/bigquery.metadataViewer](https://cloud.google.com/bigquery/docs/access-control#bigquery.metadataViewer) | +| `bigquery.routines.get` | Get Routines. Needs to retrieve metadata for a table from system table. | Table Metadata Extraction | [roles/bigquery.metadataViewer](https://cloud.google.com/bigquery/docs/access-control#bigquery.metadataViewer) | +| `bigquery.routines.list` | List Routines. Needs to retrieve metadata for a table from system table. | Table Metadata Extraction | [roles/bigquery.metadataViewer](https://cloud.google.com/bigquery/docs/access-control#bigquery.metadataViewer) | +| `resourcemanager.projects.get` | Retrieve project names and metadata. | Table Metadata Extraction | [roles/bigquery.metadataViewer](https://cloud.google.com/bigquery/docs/access-control#bigquery.metadataViewer) | +| `bigquery.jobs.listAll` | List all jobs (queries) submitted by any user. Needs for Lineage extraction. | Lineage Extraction/Usage Extraction | [roles/bigquery.resourceViewer](https://cloud.google.com/bigquery/docs/access-control#bigquery.resourceViewer) | +| `logging.logEntries.list` | Fetch log entries for lineage/usage data. Not required if `use_exported_bigquery_audit_metadata` is enabled. | Lineage Extraction/Usage Extraction | [roles/logging.privateLogViewer](https://cloud.google.com/logging/docs/access-control#logging.privateLogViewer) | +| `logging.privateLogEntries.list` | Fetch log entries for lineage/usage data. Not required if `use_exported_bigquery_audit_metadata` is enabled. | Lineage Extraction/Usage Extraction | [roles/logging.privateLogViewer](https://cloud.google.com/logging/docs/access-control#logging.privateLogViewer) | +| `bigquery.tables.getData` | Access table data to extract storage size, last updated at, data profiles etc. | Profiling | | +| `datacatalog.policyTags.get` | *Optional* Get policy tags for columns with associated policy tags. This permission is required only if `extract_policy_tags_from_catalog` is enabled. | Policy Tag Extraction | [roles/datacatalog.viewer](https://cloud.google.com/data-catalog/docs/access-control#permissions-and-roles) | + #### Create a service account in the Extractor Project diff --git a/metadata-ingestion/examples/recipes/bigquery_to_datahub.dhub.yaml b/metadata-ingestion/examples/recipes/bigquery_to_datahub.dhub.yaml index 84f098fa06c5c..86f4898d9d502 100644 --- a/metadata-ingestion/examples/recipes/bigquery_to_datahub.dhub.yaml +++ b/metadata-ingestion/examples/recipes/bigquery_to_datahub.dhub.yaml @@ -16,6 +16,7 @@ source: #include_tables: true #include_views: true #include_table_lineage: true + #extract_policy_tags_from_catalog: true #start_time: 2021-12-15T20:08:23.091Z #end_time: 2023-12-15T20:08:23.091Z #profiling: diff --git a/metadata-ingestion/setup.py b/metadata-ingestion/setup.py index 38b45fefe00c6..cd8c9d4541c1d 100644 --- a/metadata-ingestion/setup.py +++ b/metadata-ingestion/setup.py @@ -168,6 +168,7 @@ # Google cloud logging library "google-cloud-logging<=3.5.0", "google-cloud-bigquery", + "google-cloud-datacatalog>=1.5.0", "more-itertools>=8.12.0", "sqlalchemy-bigquery>=1.4.1", } diff --git a/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/bigquery.py b/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/bigquery.py index eecc0f4372969..b47f7450575e5 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/bigquery.py +++ b/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/bigquery.py @@ -130,6 +130,7 @@ ) from datahub.utilities.mapping import Constants from datahub.utilities.perf_timer import PerfTimer +from datahub.utilities.ratelimiter import RateLimiter from datahub.utilities.registries.domain_registry import DomainRegistry logger: logging.Logger = logging.getLogger(__name__) @@ -236,8 +237,14 @@ def __init__(self, ctx: PipelineContext, config: BigQueryV2Config): BigqueryTableIdentifier._BQ_SHARDED_TABLE_SUFFIX = "" self.bigquery_data_dictionary = BigQuerySchemaApi( - self.report.schema_api_perf, self.config.get_bigquery_client() + self.report.schema_api_perf, + self.config.get_bigquery_client(), ) + if self.config.extract_policy_tags_from_catalog: + self.bigquery_data_dictionary.datacatalog_client = ( + self.config.get_policy_tag_manager_client() + ) + self.sql_parser_schema_resolver = self._init_schema_resolver() self.data_reader: Optional[BigQueryDataReader] = None @@ -742,6 +749,12 @@ def _process_schema( columns = None + rate_limiter: Optional[RateLimiter] = None + if self.config.rate_limit: + rate_limiter = RateLimiter( + max_calls=self.config.requests_per_min, period=60 + ) + if ( self.config.include_tables or self.config.include_views @@ -752,6 +765,9 @@ def _process_schema( dataset_name=dataset_name, column_limit=self.config.column_limit, run_optimized_column_query=self.config.run_optimized_column_query, + extract_policy_tags_from_catalog=self.config.extract_policy_tags_from_catalog, + report=self.report, + rate_limiter=rate_limiter, ) if self.config.include_tables: @@ -1275,6 +1291,9 @@ def gen_schema_fields(self, columns: List[BigqueryColumn]) -> List[SchemaField]: ) ) + if col.policy_tags: + for policy_tag in col.policy_tags: + tags.append(TagAssociationClass(make_tag_urn(policy_tag))) field = SchemaField( fieldPath=col.name, type=SchemaFieldDataType( diff --git a/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/bigquery_config.py b/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/bigquery_config.py index d1a6ed84a28ac..b4bfa3040d72a 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/bigquery_config.py +++ b/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/bigquery_config.py @@ -3,7 +3,7 @@ from datetime import timedelta from typing import Any, Dict, List, Optional, Union -from google.cloud import bigquery +from google.cloud import bigquery, datacatalog_v1 from google.cloud.logging_v2.client import Client as GCPLoggingClient from pydantic import Field, PositiveInt, PrivateAttr, root_validator, validator @@ -70,6 +70,9 @@ def get_bigquery_client(self) -> bigquery.Client: client_options = self.extra_client_options return bigquery.Client(self.project_on_behalf, **client_options) + def get_policy_tag_manager_client(self) -> datacatalog_v1.PolicyTagManagerClient: + return datacatalog_v1.PolicyTagManagerClient() + def make_gcp_logging_client( self, project_id: Optional[str] = None ) -> GCPLoggingClient: @@ -226,6 +229,16 @@ class BigQueryV2Config( description="Use the legacy sharded table urn suffix added.", ) + extract_policy_tags_from_catalog: bool = Field( + default=False, + description=( + "This flag enables the extraction of policy tags from the Google Data Catalog API. " + "When enabled, the extractor will fetch policy tags associated with BigQuery table columns. " + "For more information about policy tags and column-level security, refer to the documentation: " + "https://cloud.google.com/bigquery/docs/column-level-security-intro" + ), + ) + scheme: str = "bigquery" log_page_size: PositiveInt = Field( diff --git a/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/bigquery_schema.py b/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/bigquery_schema.py index ca09496eda341..e610d8604a61a 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/bigquery_schema.py +++ b/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/bigquery_schema.py @@ -2,9 +2,9 @@ from collections import defaultdict from dataclasses import dataclass, field from datetime import datetime, timezone -from typing import Any, Dict, Iterator, List, Optional +from typing import Any, Dict, Iterable, Iterator, List, Optional -from google.cloud import bigquery +from google.cloud import bigquery, datacatalog_v1 from google.cloud.bigquery.table import ( RowIterator, TableListItem, @@ -22,6 +22,7 @@ BigqueryTableType, ) from datahub.ingestion.source.sql.sql_generic import BaseColumn, BaseTable, BaseView +from datahub.utilities.ratelimiter import RateLimiter logger: logging.Logger = logging.getLogger(__name__) @@ -31,6 +32,7 @@ class BigqueryColumn(BaseColumn): field_path: str is_partition_column: bool cluster_column_position: Optional[int] + policy_tags: Optional[List[str]] = None RANGE_PARTITION_NAME: str = "RANGE" @@ -137,10 +139,14 @@ class BigqueryProject: class BigQuerySchemaApi: def __init__( - self, report: BigQuerySchemaApiPerfReport, client: bigquery.Client + self, + report: BigQuerySchemaApiPerfReport, + client: bigquery.Client, + datacatalog_client: Optional[datacatalog_v1.PolicyTagManagerClient] = None, ) -> None: self.bq_client = client self.report = report + self.datacatalog_client = datacatalog_client def get_query_result(self, query: str) -> RowIterator: logger.debug(f"Query : {query}") @@ -347,12 +353,69 @@ def _make_bigquery_view(view: bigquery.Row) -> BigqueryView: rows_count=view.get("row_count"), ) + def get_policy_tags_for_column( + self, + project_id: str, + dataset_name: str, + table_name: str, + column_name: str, + report: BigQueryV2Report, + rate_limiter: Optional[RateLimiter] = None, + ) -> Iterable[str]: + assert self.datacatalog_client + + try: + # Get the table schema + table_ref = f"{project_id}.{dataset_name}.{table_name}" + table = self.bq_client.get_table(table_ref) + schema = table.schema + + # Find the specific field in the schema + field = next((f for f in schema if f.name == column_name), None) + if not field or not field.policy_tags: + return + + # Retrieve policy tag display names + for policy_tag_name in field.policy_tags.names: + try: + if rate_limiter: + with rate_limiter: + policy_tag = self.datacatalog_client.get_policy_tag( + name=policy_tag_name + ) + else: + policy_tag = self.datacatalog_client.get_policy_tag( + name=policy_tag_name + ) + yield policy_tag.display_name + except Exception as e: + logger.warning( + f"Unexpected error when retrieving policy tag {policy_tag_name} for column {column_name} in table {table_name}: {e}", + exc_info=True, + ) + report.report_warning( + "metadata-extraction", + f"Failed to retrieve policy tag {policy_tag_name} for column {column_name} in table {table_name} due to unexpected error: {e}", + ) + except Exception as e: + logger.error( + f"Unexpected error retrieving schema for table {table_name} in dataset {dataset_name}, project {project_id}: {e}", + exc_info=True, + ) + report.report_warning( + "metadata-extraction", + f"Failed to retrieve schema for table {table_name} in dataset {dataset_name}, project {project_id} due to unexpected error: {e}", + ) + def get_columns_for_dataset( self, project_id: str, dataset_name: str, column_limit: int, + report: BigQueryV2Report, run_optimized_column_query: bool = False, + extract_policy_tags_from_catalog: bool = False, + rate_limiter: Optional[RateLimiter] = None, ) -> Optional[Dict[str, List[BigqueryColumn]]]: columns: Dict[str, List[BigqueryColumn]] = defaultdict(list) with self.report.get_columns_for_dataset: @@ -397,6 +460,18 @@ def get_columns_for_dataset( comment=column.comment, is_partition_column=column.is_partitioning_column == "YES", cluster_column_position=column.clustering_ordinal_position, + policy_tags=list( + self.get_policy_tags_for_column( + project_id, + dataset_name, + column.table_name, + column.column_name, + report, + rate_limiter, + ) + ) + if extract_policy_tags_from_catalog + else [], ) ) diff --git a/metadata-ingestion/tests/integration/bigquery_v2/bigquery_mcp_golden.json b/metadata-ingestion/tests/integration/bigquery_v2/bigquery_mcp_golden.json index b7e0c0169cccb..e7b2a7c4a9f4b 100644 --- a/metadata-ingestion/tests/integration/bigquery_v2/bigquery_mcp_golden.json +++ b/metadata-ingestion/tests/integration/bigquery_v2/bigquery_mcp_golden.json @@ -249,7 +249,11 @@ "nativeDataType": "INT", "recursive": false, "globalTags": { - "tags": [] + "tags": [ + { + "tag": "urn:li:tag:Test Policy Tag" + } + ] }, "glossaryTerms": { "terms": [ @@ -428,5 +432,21 @@ "runId": "bigquery-2022_02_03-07_00_00", "lastRunId": "no-run-id-provided" } +}, +{ + "entityType": "tag", + "entityUrn": "urn:li:tag:Test Policy Tag", + "changeType": "UPSERT", + "aspectName": "tagKey", + "aspect": { + "json": { + "name": "Test Policy Tag" + } + }, + "systemMetadata": { + "lastObserved": 1643871600000, + "runId": "bigquery-2022_02_03-07_00_00", + "lastRunId": "no-run-id-provided" + } } ] \ No newline at end of file diff --git a/metadata-ingestion/tests/integration/bigquery_v2/test_bigquery.py b/metadata-ingestion/tests/integration/bigquery_v2/test_bigquery.py index 26511d9e5df1a..a24b6174eb925 100644 --- a/metadata-ingestion/tests/integration/bigquery_v2/test_bigquery.py +++ b/metadata-ingestion/tests/integration/bigquery_v2/test_bigquery.py @@ -44,8 +44,10 @@ def random_email(): @patch.object(BigQuerySchemaApi, "get_columns_for_dataset") @patch.object(BigQueryDataReader, "get_sample_data_for_table") @patch("google.cloud.bigquery.Client") +@patch("google.cloud.datacatalog_v1.PolicyTagManagerClient") def test_bigquery_v2_ingest( client, + policy_tag_manager_client, get_sample_data_for_table, get_columns_for_dataset, get_datasets_for_project_id, @@ -78,6 +80,7 @@ def test_bigquery_v2_ingest( comment="comment", is_partition_column=False, cluster_column_position=None, + policy_tags=["Test Policy Tag"], ), BigqueryColumn( name="email",