From 6a58493011e2fdfb0235829af5f9e25dfc4d31ca Mon Sep 17 00:00:00 2001 From: Mayuri Nehate <33225191+mayurinehate@users.noreply.github.com> Date: Fri, 27 Sep 2024 16:46:37 +0530 Subject: [PATCH] fix(ingest/bq): do not query PARTITIONS for biglake tables (#11463) --- .../source/bigquery_v2/bigquery_schema.py | 17 +++++- .../source/bigquery_v2/bigquery_schema_gen.py | 57 +++++++++++++------ .../bigquery_v2/bigquery_test_connection.py | 2 +- .../ingestion/source/bigquery_v2/profiler.py | 7 +++ .../ingestion/source/common/subtypes.py | 2 + .../bigquery_v2/bigquery_mcp_golden.json | 3 +- .../integration/bigquery_v2/test_bigquery.py | 8 ++- .../tests/unit/test_bigquery_source.py | 4 +- 8 files changed, 75 insertions(+), 25 deletions(-) diff --git a/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/bigquery_schema.py b/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/bigquery_schema.py index 7c1abe2ce35691..dd683559a007b3 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/bigquery_schema.py +++ b/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/bigquery_schema.py @@ -105,6 +105,7 @@ class BigqueryTable(BaseTable): long_term_billable_bytes: Optional[int] = None partition_info: Optional[PartitionInfo] = None columns_ignore_from_profiling: List[str] = field(default_factory=list) + external: bool = False @dataclass @@ -252,7 +253,16 @@ def get_datasets_for_project_id( self.report.num_list_datasets_api_requests += 1 datasets = self.bq_client.list_datasets(project_id, max_results=maxResults) return [ - BigqueryDataset(name=d.dataset_id, labels=d.labels) for d in datasets + BigqueryDataset( + name=d.dataset_id, + labels=d.labels, + location=( + d._properties.get("location") + if hasattr(d, "_properties") and isinstance(d._properties, dict) + else None + ), + ) + for d in datasets ] # This is not used anywhere @@ -295,12 +305,12 @@ def get_tables_for_dataset( dataset_name: str, tables: Dict[str, TableListItem], report: BigQueryV2Report, - with_data_read_permission: bool = False, + with_partitions: bool = False, ) -> Iterator[BigqueryTable]: with PerfTimer() as current_timer: filter_clause: str = ", ".join(f"'{table}'" for table in tables.keys()) - if with_data_read_permission: + if with_partitions: query_template = BigqueryQuery.tables_for_dataset else: query_template = BigqueryQuery.tables_for_dataset_without_partition_data @@ -374,6 +384,7 @@ def _make_bigquery_table( num_partitions=table.get("num_partitions"), active_billable_bytes=table.get("active_billable_bytes"), long_term_billable_bytes=table.get("long_term_billable_bytes"), + external=(table.table_type == BigqueryTableType.EXTERNAL), ) def get_views_for_dataset( diff --git a/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/bigquery_schema_gen.py b/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/bigquery_schema_gen.py index 489773c5745ff2..6ea8f21e8b2916 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/bigquery_schema_gen.py +++ b/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/bigquery_schema_gen.py @@ -242,7 +242,11 @@ def gen_project_id_containers(self, database: str) -> Iterable[MetadataWorkUnit] ) def gen_dataset_containers( - self, dataset: str, project_id: str, tags: Optional[Dict[str, str]] = None + self, + dataset: str, + project_id: str, + tags: Optional[Dict[str, str]] = None, + extra_properties: Optional[Dict[str, str]] = None, ) -> Iterable[MetadataWorkUnit]: schema_container_key = self.gen_dataset_key(project_id, dataset) @@ -272,6 +276,7 @@ def gen_dataset_containers( else None ), tags=tags_joined, + extra_properties=extra_properties, ) def _process_project( @@ -400,7 +405,14 @@ def _process_schema( if self.config.include_schema_metadata: yield from self.gen_dataset_containers( - dataset_name, project_id, bigquery_dataset.labels + dataset_name, + project_id, + bigquery_dataset.labels, + ( + {"location": bigquery_dataset.location} + if bigquery_dataset.location + else None + ), ) columns = None @@ -445,7 +457,7 @@ def _process_schema( if self.config.include_tables: db_tables[dataset_name] = list( - self.get_tables_for_dataset(project_id, dataset_name) + self.get_tables_for_dataset(project_id, bigquery_dataset) ) for table in db_tables[dataset_name]: @@ -686,7 +698,9 @@ def gen_table_dataset_workunits( if table.max_shard_id: custom_properties["max_shard_id"] = str(table.max_shard_id) custom_properties["is_sharded"] = str(True) - sub_types = ["sharded table"] + sub_types + sub_types = [DatasetSubTypes.SHARDED_TABLE] + sub_types + if table.external: + sub_types = [DatasetSubTypes.EXTERNAL_TABLE] + sub_types tags_to_add = None if table.labels and self.config.capture_table_label_as_tag: @@ -971,25 +985,36 @@ def gen_schema_metadata( def get_tables_for_dataset( self, project_id: str, - dataset_name: str, + dataset: BigqueryDataset, ) -> Iterable[BigqueryTable]: # In bigquery there is no way to query all tables in a Project id with PerfTimer() as timer: + + # PARTITIONS INFORMATION_SCHEMA view is not available for BigLake tables + # based on Amazon S3 and Blob Storage data. + # https://cloud.google.com/bigquery/docs/omni-introduction#limitations + # Omni Locations - https://cloud.google.com/bigquery/docs/omni-introduction#locations + with_partitions = self.config.have_table_data_read_permission and not ( + dataset.location + and dataset.location.lower().startswith(("aws-", "azure-")) + ) + # Partitions view throw exception if we try to query partition info for too many tables # so we have to limit the number of tables we query partition info. # The conn.list_tables returns table infos that information_schema doesn't contain and this # way we can merge that info with the queried one. # https://cloud.google.com/bigquery/docs/information-schema-partitions - max_batch_size: int = ( - self.config.number_of_datasets_process_in_batch - if not self.config.have_table_data_read_permission - else self.config.number_of_datasets_process_in_batch_if_profiling_enabled - ) + if with_partitions: + max_batch_size = ( + self.config.number_of_datasets_process_in_batch_if_profiling_enabled + ) + else: + max_batch_size = self.config.number_of_datasets_process_in_batch # We get the list of tables in the dataset to get core table properties and to be able to process the tables in batches # We collect only the latest shards from sharded tables (tables with _YYYYMMDD suffix) and ignore temporary tables table_items = self.get_core_table_details( - dataset_name, project_id, self.config.temp_table_dataset_prefix + dataset.name, project_id, self.config.temp_table_dataset_prefix ) items_to_get: Dict[str, TableListItem] = {} @@ -998,9 +1023,9 @@ def get_tables_for_dataset( if len(items_to_get) % max_batch_size == 0: yield from self.schema_api.get_tables_for_dataset( project_id, - dataset_name, + dataset.name, items_to_get, - with_data_read_permission=self.config.have_table_data_read_permission, + with_partitions=with_partitions, report=self.report, ) items_to_get.clear() @@ -1008,13 +1033,13 @@ def get_tables_for_dataset( if items_to_get: yield from self.schema_api.get_tables_for_dataset( project_id, - dataset_name, + dataset.name, items_to_get, - with_data_read_permission=self.config.have_table_data_read_permission, + with_partitions=with_partitions, report=self.report, ) - self.report.metadata_extraction_sec[f"{project_id}.{dataset_name}"] = round( + self.report.metadata_extraction_sec[f"{project_id}.{dataset.name}"] = round( timer.elapsed_seconds(), 2 ) diff --git a/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/bigquery_test_connection.py b/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/bigquery_test_connection.py index 27beb7b0254c41..fe64eeeb841399 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/bigquery_test_connection.py +++ b/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/bigquery_test_connection.py @@ -113,7 +113,7 @@ def metadata_read_capability_test( project_id=project_id, dataset_name=result[0].name, tables={}, - with_data_read_permission=config.have_table_data_read_permission, + with_partitions=config.have_table_data_read_permission, report=BigQueryV2Report(), ) if len(list(tables)) == 0: diff --git a/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/profiler.py b/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/profiler.py index 582c312f99098b..6af8166fbf70c3 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/profiler.py +++ b/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/profiler.py @@ -173,6 +173,13 @@ def get_workunits( f"^{normalized_table_name}.{column}$" ) + if table.external and not self.config.profiling.profile_external_tables: + self.report.profiling_skipped_other[f"{project_id}.{dataset}"] += 1 + logger.info( + f"Skipping profiling of external table {project_id}.{dataset}.{table.name}" + ) + continue + # Emit the profile work unit logger.debug( f"Creating profile request for table {normalized_table_name}" diff --git a/metadata-ingestion/src/datahub/ingestion/source/common/subtypes.py b/metadata-ingestion/src/datahub/ingestion/source/common/subtypes.py index 4bc120fbecf8f1..86c1c8db11b05f 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/common/subtypes.py +++ b/metadata-ingestion/src/datahub/ingestion/source/common/subtypes.py @@ -17,6 +17,8 @@ class DatasetSubTypes(StrEnum): POWERBI_DATASET_TABLE = "PowerBI Dataset Table" QLIK_DATASET = "Qlik Dataset" BIGQUERY_TABLE_SNAPSHOT = "Bigquery Table Snapshot" + SHARDED_TABLE = "Sharded Table" + EXTERNAL_TABLE = "External Table" SIGMA_DATASET = "Sigma Dataset" SAC_MODEL = "Model" SAC_IMPORT_DATA_MODEL = "Import Data Model" diff --git a/metadata-ingestion/tests/integration/bigquery_v2/bigquery_mcp_golden.json b/metadata-ingestion/tests/integration/bigquery_v2/bigquery_mcp_golden.json index 537eeb56231498..fcf65130df9757 100644 --- a/metadata-ingestion/tests/integration/bigquery_v2/bigquery_mcp_golden.json +++ b/metadata-ingestion/tests/integration/bigquery_v2/bigquery_mcp_golden.json @@ -98,7 +98,8 @@ "platform": "bigquery", "env": "PROD", "project_id": "project-id-1", - "dataset_id": "bigquery-dataset-1" + "dataset_id": "bigquery-dataset-1", + "location": "US" }, "externalUrl": "https://console.cloud.google.com/bigquery?project=project-id-1&ws=!1m4!1m3!3m2!1sproject-id-1!2sbigquery-dataset-1", "name": "bigquery-dataset-1", diff --git a/metadata-ingestion/tests/integration/bigquery_v2/test_bigquery.py b/metadata-ingestion/tests/integration/bigquery_v2/test_bigquery.py index 1934e135457afa..f9481d1d83d8b2 100644 --- a/metadata-ingestion/tests/integration/bigquery_v2/test_bigquery.py +++ b/metadata-ingestion/tests/integration/bigquery_v2/test_bigquery.py @@ -103,7 +103,9 @@ def test_bigquery_v2_ingest( mcp_output_path = "{}/{}".format(tmp_path, "bigquery_mcp_output.json") dataset_name = "bigquery-dataset-1" - get_datasets_for_project_id.return_value = [BigqueryDataset(name=dataset_name)] + get_datasets_for_project_id.return_value = [ + BigqueryDataset(name=dataset_name, location="US") + ] table_list_item = TableListItem( {"tableReference": {"projectId": "", "datasetId": "", "tableId": ""}} @@ -321,7 +323,9 @@ def test_bigquery_queries_v2_ingest( mcp_output_path = "{}/{}".format(tmp_path, "bigquery_mcp_output.json") dataset_name = "bigquery-dataset-1" - get_datasets_for_project_id.return_value = [BigqueryDataset(name=dataset_name)] + get_datasets_for_project_id.return_value = [ + BigqueryDataset(name=dataset_name, location="US") + ] table_list_item = TableListItem( {"tableReference": {"projectId": "", "datasetId": "", "tableId": ""}} diff --git a/metadata-ingestion/tests/unit/test_bigquery_source.py b/metadata-ingestion/tests/unit/test_bigquery_source.py index 8ec19e5bb9e56f..38239d150dd6b5 100644 --- a/metadata-ingestion/tests/unit/test_bigquery_source.py +++ b/metadata-ingestion/tests/unit/test_bigquery_source.py @@ -814,7 +814,7 @@ def test_table_processing_logic( _ = list( schema_gen.get_tables_for_dataset( - project_id="test-project", dataset_name="test-dataset" + project_id="test-project", dataset=BigqueryDataset("test-dataset") ) ) @@ -890,7 +890,7 @@ def test_table_processing_logic_date_named_tables( _ = list( schema_gen.get_tables_for_dataset( - project_id="test-project", dataset_name="test-dataset" + project_id="test-project", dataset=BigqueryDataset("test-dataset") ) )