Skip to content

Commit

Permalink
refactor(ingest/sql): add _get_view_definition helper method (datahub…
Browse files Browse the repository at this point in the history
  • Loading branch information
hsheth2 authored Dec 6, 2024
1 parent b495205 commit bd7649e
Show file tree
Hide file tree
Showing 2 changed files with 32 additions and 23 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,6 @@

import pydantic
import sqlalchemy.dialects.mssql

# This import verifies that the dependencies are available.
from pydantic.fields import Field
from sqlalchemy import create_engine, inspect
from sqlalchemy.engine.base import Connection
Expand Down
53 changes: 32 additions & 21 deletions metadata-ingestion/src/datahub/ingestion/source/sql/sql_common.py
Original file line number Diff line number Diff line change
Expand Up @@ -582,6 +582,8 @@ def get_view_lineage(self) -> Iterable[MetadataWorkUnit]:
generate_operations=False,
)
for dataset_name in self._view_definition_cache.keys():
# TODO: Ensure that the lineage generated from the view definition
# matches the dataset_name.
view_definition = self._view_definition_cache[dataset_name]
result = self._run_sql_parser(
dataset_name,
Expand Down Expand Up @@ -1059,6 +1061,20 @@ def loop_views(
exc=e,
)

def _get_view_definition(self, inspector: Inspector, schema: str, view: str) -> str:
try:
view_definition = inspector.get_view_definition(view, schema)
if view_definition is None:
view_definition = ""
else:
# Some dialects return a TextClause instead of a raw string,
# so we need to convert them to a string.
view_definition = str(view_definition)
except NotImplementedError:
view_definition = ""

return view_definition

def _process_view(
self,
dataset_name: str,
Expand All @@ -1077,7 +1093,10 @@ def _process_view(
columns = inspector.get_columns(view, schema)
except KeyError:
# For certain types of views, we are unable to fetch the list of columns.
self.warn(logger, dataset_name, "unable to get schema for this view")
self.report.warning(
message="Unable to get schema for a view",
context=f"{dataset_name}",
)
schema_metadata = None
else:
schema_fields = self.get_schema_fields(dataset_name, columns, inspector)
Expand All @@ -1091,19 +1110,12 @@ def _process_view(
if self._save_schema_to_resolver():
self.schema_resolver.add_schema_metadata(dataset_urn, schema_metadata)
self.discovered_datasets.add(dataset_name)

description, properties, _ = self.get_table_properties(inspector, schema, view)
try:
view_definition = inspector.get_view_definition(view, schema)
if view_definition is None:
view_definition = ""
else:
# Some dialects return a TextClause instead of a raw string,
# so we need to convert them to a string.
view_definition = str(view_definition)
except NotImplementedError:
view_definition = ""
properties["view_definition"] = view_definition
properties["is_view"] = "True"

view_definition = self._get_view_definition(inspector, schema, view)
properties["view_definition"] = view_definition
if view_definition and self.config.include_view_lineage:
self._view_definition_cache[dataset_name] = view_definition

Expand Down Expand Up @@ -1135,15 +1147,14 @@ def _process_view(
entityUrn=dataset_urn,
aspect=SubTypesClass(typeNames=[DatasetSubTypes.VIEW]),
).as_workunit()
if "view_definition" in properties:
view_definition_string = properties["view_definition"]
view_properties_aspect = ViewPropertiesClass(
materialized=False, viewLanguage="SQL", viewLogic=view_definition_string
)
yield MetadataChangeProposalWrapper(
entityUrn=dataset_urn,
aspect=view_properties_aspect,
).as_workunit()

view_properties_aspect = ViewPropertiesClass(
materialized=False, viewLanguage="SQL", viewLogic=view_definition
)
yield MetadataChangeProposalWrapper(
entityUrn=dataset_urn,
aspect=view_properties_aspect,
).as_workunit()

if self.config.domain and self.domain_registry:
yield from get_domain_wu(
Expand Down

0 comments on commit bd7649e

Please sign in to comment.