diff --git a/metadata-ingestion/src/datahub/ingestion/source/sql/mssql/source.py b/metadata-ingestion/src/datahub/ingestion/source/sql/mssql/source.py index 7a2dbda8b4a939..414c1faaa1661a 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/sql/mssql/source.py +++ b/metadata-ingestion/src/datahub/ingestion/source/sql/mssql/source.py @@ -5,8 +5,6 @@ import pydantic import sqlalchemy.dialects.mssql - -# This import verifies that the dependencies are available. from pydantic.fields import Field from sqlalchemy import create_engine, inspect from sqlalchemy.engine.base import Connection diff --git a/metadata-ingestion/src/datahub/ingestion/source/sql/sql_common.py b/metadata-ingestion/src/datahub/ingestion/source/sql/sql_common.py index 64aa8cfc6ef6c7..4e22930e7a2a0b 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/sql/sql_common.py +++ b/metadata-ingestion/src/datahub/ingestion/source/sql/sql_common.py @@ -582,6 +582,8 @@ def get_view_lineage(self) -> Iterable[MetadataWorkUnit]: generate_operations=False, ) for dataset_name in self._view_definition_cache.keys(): + # TODO: Ensure that the lineage generated from the view definition + # matches the dataset_name. view_definition = self._view_definition_cache[dataset_name] result = self._run_sql_parser( dataset_name, @@ -1059,6 +1061,20 @@ def loop_views( exc=e, ) + def _get_view_definition(self, inspector: Inspector, schema: str, view: str) -> str: + try: + view_definition = inspector.get_view_definition(view, schema) + if view_definition is None: + view_definition = "" + else: + # Some dialects return a TextClause instead of a raw string, + # so we need to convert them to a string. + view_definition = str(view_definition) + except NotImplementedError: + view_definition = "" + + return view_definition + def _process_view( self, dataset_name: str, @@ -1077,7 +1093,10 @@ def _process_view( columns = inspector.get_columns(view, schema) except KeyError: # For certain types of views, we are unable to fetch the list of columns. - self.warn(logger, dataset_name, "unable to get schema for this view") + self.report.warning( + message="Unable to get schema for a view", + context=f"{dataset_name}", + ) schema_metadata = None else: schema_fields = self.get_schema_fields(dataset_name, columns, inspector) @@ -1091,19 +1110,12 @@ def _process_view( if self._save_schema_to_resolver(): self.schema_resolver.add_schema_metadata(dataset_urn, schema_metadata) self.discovered_datasets.add(dataset_name) + description, properties, _ = self.get_table_properties(inspector, schema, view) - try: - view_definition = inspector.get_view_definition(view, schema) - if view_definition is None: - view_definition = "" - else: - # Some dialects return a TextClause instead of a raw string, - # so we need to convert them to a string. - view_definition = str(view_definition) - except NotImplementedError: - view_definition = "" - properties["view_definition"] = view_definition properties["is_view"] = "True" + + view_definition = self._get_view_definition(inspector, schema, view) + properties["view_definition"] = view_definition if view_definition and self.config.include_view_lineage: self._view_definition_cache[dataset_name] = view_definition @@ -1135,15 +1147,14 @@ def _process_view( entityUrn=dataset_urn, aspect=SubTypesClass(typeNames=[DatasetSubTypes.VIEW]), ).as_workunit() - if "view_definition" in properties: - view_definition_string = properties["view_definition"] - view_properties_aspect = ViewPropertiesClass( - materialized=False, viewLanguage="SQL", viewLogic=view_definition_string - ) - yield MetadataChangeProposalWrapper( - entityUrn=dataset_urn, - aspect=view_properties_aspect, - ).as_workunit() + + view_properties_aspect = ViewPropertiesClass( + materialized=False, viewLanguage="SQL", viewLogic=view_definition + ) + yield MetadataChangeProposalWrapper( + entityUrn=dataset_urn, + aspect=view_properties_aspect, + ).as_workunit() if self.config.domain and self.domain_registry: yield from get_domain_wu(