Skip to content

Commit

Permalink
fix(ingestion/lookml): resolve CLL issue caused by column name casing. (
Browse files Browse the repository at this point in the history
  • Loading branch information
sid-acryl authored Dec 12, 2024
1 parent c0b49a6 commit 2ec9cb0
Show file tree
Hide file tree
Showing 9 changed files with 614 additions and 54 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -88,8 +88,7 @@ def column_name_in_sql_attribute(self) -> List[str]:
for upstream_field_match in re.finditer(r"\${TABLE}\.[\"]*([\.\w]+)", sql):
matched_field = upstream_field_match.group(1)
# Remove quotes from field names
matched_field = matched_field.replace('"', "").replace("`", "").lower()
column_names.append(matched_field)
column_names.append(matched_field.replace('"', "").replace("`", "").lower())

return column_names

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,11 +25,13 @@
LookMLSourceReport,
)
from datahub.ingestion.source.looker.urn_functions import get_qualified_table_name
from datahub.sql_parsing.schema_resolver import match_columns_to_schema
from datahub.sql_parsing.sqlglot_lineage import (
ColumnLineageInfo,
ColumnRef,
SqlParsingResult,
Urn,
create_and_cache_schema_resolver,
create_lineage_sql_parsed_result,
)

Expand Down Expand Up @@ -200,7 +202,7 @@ def _generate_fully_qualified_name(
class AbstractViewUpstream(ABC):
"""
Implementation of this interface extracts the view upstream as per the way the view is bound to datasets.
For detail explanation please refer lookml_concept_context.LookerViewContext documentation.
For detail explanation, please refer lookml_concept_context.LookerViewContext documentation.
"""

view_context: LookerViewContext
Expand Down Expand Up @@ -236,6 +238,47 @@ def get_upstream_dataset_urn(self) -> List[Urn]:
def create_fields(self) -> List[ViewField]:
return [] # it is for the special case

def create_upstream_column_refs(
self, upstream_urn: str, downstream_looker_columns: List[str]
) -> List[ColumnRef]:
"""
- **`upstream_urn`**: The URN of the upstream dataset.
- **`expected_columns`**: These are the columns identified by the Looker connector as belonging to the `upstream_urn` dataset. However, there is potential for human error in specifying the columns of the upstream dataset. For example, a user might declare a column in lowercase, while on the actual platform, it may exist in uppercase, or vice versa.
- This function ensures consistency in column-level lineage by consulting GMS before creating the final `ColumnRef` instance, avoiding discrepancies.
"""
schema_resolver = create_and_cache_schema_resolver(
platform=self.view_context.view_connection.platform,
platform_instance=self.view_context.view_connection.platform_instance,
env=self.view_context.view_connection.platform_env or self.config.env,
graph=self.ctx.graph,
)

urn, schema_info = schema_resolver.resolve_urn(urn=upstream_urn)

if schema_info:
actual_columns = match_columns_to_schema(
schema_info, downstream_looker_columns
)
else:
logger.info(
f"schema_info not found for dataset {urn} in GMS. Using expected_columns to form ColumnRef"
)
actual_columns = [column.lower() for column in downstream_looker_columns]

upstream_column_refs: List[ColumnRef] = []

for column in actual_columns:
upstream_column_refs.append(
ColumnRef(
column=column,
table=upstream_urn,
)
)

return upstream_column_refs


class SqlBasedDerivedViewUpstream(AbstractViewUpstream, ABC):
"""
Expand Down Expand Up @@ -372,15 +415,12 @@ def get_upstream_column_ref(
# in-case of "select * from look_ml_view.SQL_TABLE_NAME" or extra field are defined in the looker view which is
# referring to upstream table
if self._get_upstream_dataset_urn() and not upstreams_column_refs:
upstreams_column_refs = [
ColumnRef(
table=self._get_upstream_dataset_urn()[
0
], # 0th index has table of from clause
column=column,
)
for column in field_context.column_name_in_sql_attribute()
]
upstreams_column_refs = self.create_upstream_column_refs(
upstream_urn=self._get_upstream_dataset_urn()[
0
], # 0th index has table of from clause,
downstream_looker_columns=field_context.column_name_in_sql_attribute(),
)

# fix any derived view reference present in urn
upstreams_column_refs = resolve_derived_view_urn_of_col_ref(
Expand Down Expand Up @@ -487,18 +527,18 @@ def get_upstream_column_ref(
return upstream_column_refs

explore_urn: str = self._get_upstream_dataset_urn()[0]
expected_columns: List[str] = []

for column in field_context.column_name_in_sql_attribute():
if column in self._get_explore_column_mapping():
explore_column: Dict = self._get_explore_column_mapping()[column]
upstream_column_refs.append(
ColumnRef(
column=explore_column.get("field", explore_column[NAME]),
table=explore_urn,
)
expected_columns.append(
explore_column.get("field", explore_column[NAME])
)

return upstream_column_refs
return self.create_upstream_column_refs(
upstream_urn=explore_urn, downstream_looker_columns=expected_columns
)

def get_upstream_dataset_urn(self) -> List[Urn]:
return self._get_upstream_dataset_urn()
Expand Down Expand Up @@ -548,14 +588,10 @@ def __get_upstream_dataset_urn(self) -> Urn:
def get_upstream_column_ref(
self, field_context: LookerFieldContext
) -> List[ColumnRef]:
upstream_column_ref: List[ColumnRef] = []

for column_name in field_context.column_name_in_sql_attribute():
upstream_column_ref.append(
ColumnRef(table=self._get_upstream_dataset_urn(), column=column_name)
)

return upstream_column_ref
return self.create_upstream_column_refs(
upstream_urn=self._get_upstream_dataset_urn(),
downstream_looker_columns=field_context.column_name_in_sql_attribute(),
)

def get_upstream_dataset_urn(self) -> List[Urn]:
return [self._get_upstream_dataset_urn()]
Expand Down Expand Up @@ -609,15 +645,14 @@ def get_upstream_column_ref(
self, field_context: LookerFieldContext
) -> List[ColumnRef]:
upstream_column_ref: List[ColumnRef] = []

if not self._get_upstream_dataset_urn():
return upstream_column_ref

for column_name in field_context.column_name_in_sql_attribute():
upstream_column_ref.append(
ColumnRef(table=self._get_upstream_dataset_urn()[0], column=column_name)
)

return upstream_column_ref
return self.create_upstream_column_refs(
upstream_urn=self._get_upstream_dataset_urn()[0],
downstream_looker_columns=field_context.column_name_in_sql_attribute(),
)

def get_upstream_dataset_urn(self) -> List[Urn]:
return self._get_upstream_dataset_urn()
Expand Down
23 changes: 23 additions & 0 deletions metadata-ingestion/src/datahub/sql_parsing/schema_resolver.py
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,13 @@ def get_urn_for_table(
)
return urn

def resolve_urn(self, urn: str) -> Tuple[str, Optional[SchemaInfo]]:
schema_info = self._resolve_schema_info(urn)
if schema_info:
return urn, schema_info

return urn, None

def resolve_table(self, table: _TableName) -> Tuple[str, Optional[SchemaInfo]]:
urn = self.get_urn_for_table(table)

Expand Down Expand Up @@ -293,3 +300,19 @@ def _convert_schema_field_list_to_info(

def _convert_schema_aspect_to_info(schema_metadata: SchemaMetadataClass) -> SchemaInfo:
return _convert_schema_field_list_to_info(schema_metadata.fields)


def match_columns_to_schema(
schema_info: SchemaInfo, input_columns: List[str]
) -> List[str]:
column_from_gms: List[str] = list(schema_info.keys()) # list() to silent lint

gms_column_map: Dict[str, str] = {
column.lower(): column for column in column_from_gms
}

output_columns: List[str] = [
gms_column_map.get(column.lower(), column) for column in input_columns
]

return output_columns
61 changes: 48 additions & 13 deletions metadata-ingestion/src/datahub/sql_parsing/sqlglot_lineage.py
Original file line number Diff line number Diff line change
Expand Up @@ -1181,6 +1181,45 @@ def sqlglot_lineage(
)


@functools.lru_cache(maxsize=128)
def create_and_cache_schema_resolver(
platform: str,
env: str,
graph: Optional[DataHubGraph] = None,
platform_instance: Optional[str] = None,
schema_aware: bool = True,
) -> SchemaResolver:
return create_schema_resolver(
platform=platform,
env=env,
graph=graph,
platform_instance=platform_instance,
schema_aware=schema_aware,
)


def create_schema_resolver(
platform: str,
env: str,
graph: Optional[DataHubGraph] = None,
platform_instance: Optional[str] = None,
schema_aware: bool = True,
) -> SchemaResolver:
if graph and schema_aware:
return graph._make_schema_resolver(
platform=platform,
platform_instance=platform_instance,
env=env,
)

return SchemaResolver(
platform=platform,
platform_instance=platform_instance,
env=env,
graph=None,
)


def create_lineage_sql_parsed_result(
query: str,
default_db: Optional[str],
Expand All @@ -1191,21 +1230,17 @@ def create_lineage_sql_parsed_result(
graph: Optional[DataHubGraph] = None,
schema_aware: bool = True,
) -> SqlParsingResult:
schema_resolver = create_schema_resolver(
platform=platform,
platform_instance=platform_instance,
env=env,
schema_aware=schema_aware,
graph=graph,
)

needs_close: bool = True
if graph and schema_aware:
needs_close = False
schema_resolver = graph._make_schema_resolver(
platform=platform,
platform_instance=platform_instance,
env=env,
)
else:
needs_close = True
schema_resolver = SchemaResolver(
platform=platform,
platform_instance=platform_instance,
env=env,
graph=None,
)

try:
return sqlglot_lineage(
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
connection: "my_connection"

include: "top_10_employee_income_source.view.lkml"

explore: top_10_employee_income_source {
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
view: top_10_employee_income_source {
sql_table_name: "db.public.employee"
;;
dimension: id {
type: number
sql: ${TABLE}.id ;;
}

dimension: name {
type: string
sql: ${TABLE}.name ;;
}

dimension: source {
type: string
sql: ${TABLE}.source ;;
}
}
Loading

0 comments on commit 2ec9cb0

Please sign in to comment.