Skip to content

Commit

Permalink
fix(ingest/bigquery): Fixing lineage filter query (datahub-project#9114)
Browse files Browse the repository at this point in the history
  • Loading branch information
treff7es authored Oct 26, 2023
1 parent f402090 commit a96a512
Show file tree
Hide file tree
Showing 3 changed files with 13 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -309,6 +309,7 @@ def backward_compatibility_configs_set(cls, values: Dict) -> Dict:
"dataset_pattern is not set but schema_pattern is set, using schema_pattern as dataset_pattern. schema_pattern will be deprecated, please use dataset_pattern instead."
)
values["dataset_pattern"] = schema_pattern
dataset_pattern = schema_pattern
elif (
dataset_pattern != AllowDenyPattern.allow_all()
and schema_pattern != AllowDenyPattern.allow_all()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
from google.cloud.datacatalog import lineage_v1
from google.cloud.logging_v2.client import Client as GCPLoggingClient

from datahub.configuration.pattern_utils import is_schema_allowed
from datahub.emitter import mce_builder
from datahub.emitter.mcp import MetadataChangeProposalWrapper
from datahub.ingestion.api.workunit import MetadataWorkUnit
Expand Down Expand Up @@ -683,8 +684,11 @@ def _create_lineage_map(
self.report.num_skipped_lineage_entries_missing_data[e.project_id] += 1
continue

if not self.config.dataset_pattern.allowed(
destination_table.table_identifier.dataset
if not is_schema_allowed(
self.config.dataset_pattern,
destination_table.table_identifier.dataset,
destination_table.table_identifier.project_id,
self.config.match_fully_qualified_names,
) or not self.config.table_pattern.allowed(
destination_table.table_identifier.get_table_name()
):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@

import humanfriendly

from datahub.configuration.pattern_utils import is_schema_allowed
from datahub.configuration.time_window_config import (
BaseTimeWindowConfig,
get_time_bucket,
Expand Down Expand Up @@ -335,10 +336,11 @@ def get_time_window(self) -> Tuple[datetime, datetime]:
def _is_table_allowed(self, table_ref: Optional[BigQueryTableRef]) -> bool:
return (
table_ref is not None
and self.config.dataset_pattern.allowed(
f"{table_ref.table_identifier.project_id}.{table_ref.table_identifier.dataset}"
if self.config.match_fully_qualified_names
else table_ref.table_identifier.dataset
and is_schema_allowed(
self.config.dataset_pattern,
table_ref.table_identifier.dataset,
table_ref.table_identifier.project_id,
self.config.match_fully_qualified_names,
)
and self.config.table_pattern.allowed(str(table_ref.table_identifier))
)
Expand Down

0 comments on commit a96a512

Please sign in to comment.