From a8f0080c08b5c816f0dae9d3bef07ea00220541e Mon Sep 17 00:00:00 2001 From: Tamas Nemeth Date: Fri, 13 Oct 2023 00:14:45 +0200 Subject: [PATCH] feat(ingest/teradata): Teradata source (#8977) --- .../docs/sources/teradata/teradata_pre.md | 28 +++ .../docs/sources/teradata/teradata_recipe.yml | 17 ++ metadata-ingestion/setup.py | 3 + .../datahub/ingestion/source/sql/teradata.py | 228 ++++++++++++++++++ .../testing/check_sql_parser_result.py | 5 +- .../src/datahub/utilities/sqlglot_lineage.py | 5 + .../test_teradata_default_normalization.json | 38 +++ .../unit/sql_parsing/test_sqlglot_lineage.py | 42 ++++ 8 files changed, 365 insertions(+), 1 deletion(-) create mode 100644 metadata-ingestion/docs/sources/teradata/teradata_pre.md create mode 100644 metadata-ingestion/docs/sources/teradata/teradata_recipe.yml create mode 100644 metadata-ingestion/src/datahub/ingestion/source/sql/teradata.py create mode 100644 metadata-ingestion/tests/unit/sql_parsing/goldens/test_teradata_default_normalization.json diff --git a/metadata-ingestion/docs/sources/teradata/teradata_pre.md b/metadata-ingestion/docs/sources/teradata/teradata_pre.md new file mode 100644 index 00000000000000..eb59caa29eb52f --- /dev/null +++ b/metadata-ingestion/docs/sources/teradata/teradata_pre.md @@ -0,0 +1,28 @@ +### Prerequisites +1. Create a user which has access to the database you want to ingest. + ```sql + CREATE USER datahub FROM AS PASSWORD = PERM = 20000000; + ``` +2. Create a user with the following privileges: + ```sql + GRANT SELECT ON dbc.columns TO datahub; + GRANT SELECT ON dbc.databases TO datahub; + GRANT SELECT ON dbc.tables TO datahub; + GRANT SELECT ON DBC.All_RI_ChildrenV TO datahub; + GRANT SELECT ON DBC.ColumnsV TO datahub; + GRANT SELECT ON DBC.IndicesV TO datahub; + GRANT SELECT ON dbc.TableTextV TO datahub; + GRANT SELECT ON dbc.TablesV TO datahub; + GRANT SELECT ON dbc.dbqlogtbl TO datahub; -- if lineage or usage extraction is enabled + ``` + + If you want to run profiling, you need to grant select permission on all the tables you want to profile. + +3. If linege or usage extraction is enabled, please, check if query logging is enabled and it is set to size which +will fit for your queries (the default query text size Teradata captures is max 200 chars) + An example how you can set it for all users: + ```sql + REPLACE QUERY LOGGING LIMIT SQLTEXT=2000 ON ALL; + ``` + See more here about query logging: + [https://docs.teradata.com/r/Teradata-VantageCloud-Lake/Database-Reference/Database-Administration/Tracking-Query-Behavior-with-Database-Query-Logging-Operational-DBAs]() diff --git a/metadata-ingestion/docs/sources/teradata/teradata_recipe.yml b/metadata-ingestion/docs/sources/teradata/teradata_recipe.yml new file mode 100644 index 00000000000000..8cf07ba4c3a01e --- /dev/null +++ b/metadata-ingestion/docs/sources/teradata/teradata_recipe.yml @@ -0,0 +1,17 @@ +pipeline_name: my-teradata-ingestion-pipeline +source: + type: teradata + config: + host_port: "myteradatainstance.teradata.com:1025" + #platform_instance: "myteradatainstance" + username: myuser + password: mypassword + #database_pattern: + # allow: + # - "demo_user" + # ignoreCase: true + include_table_lineage: true + include_usage_statistics: true + stateful_ingestion: + enabled: true +sink: diff --git a/metadata-ingestion/setup.py b/metadata-ingestion/setup.py index 61e7b684682a45..3ea9a2ea61d740 100644 --- a/metadata-ingestion/setup.py +++ b/metadata-ingestion/setup.py @@ -373,6 +373,7 @@ # FIXME: I don't think tableau uses sqllineage anymore so we should be able # to remove that dependency. "tableau": {"tableauserverclient>=0.17.0"} | sqllineage_lib | sqlglot_lib, + "teradata": sql_common | {"teradatasqlalchemy>=17.20.0.0"}, "trino": sql_common | trino, "starburst-trino-usage": sql_common | usage_common | trino, "nifi": {"requests", "packaging", "requests-gssapi"}, @@ -499,6 +500,7 @@ "s3", "snowflake", "tableau", + "teradata", "trino", "hive", "starburst-trino-usage", @@ -597,6 +599,7 @@ "tableau = datahub.ingestion.source.tableau:TableauSource", "openapi = datahub.ingestion.source.openapi:OpenApiSource", "metabase = datahub.ingestion.source.metabase:MetabaseSource", + "teradata = datahub.ingestion.source.sql.teradata:TeradataSource", "trino = datahub.ingestion.source.sql.trino:TrinoSource", "starburst-trino-usage = datahub.ingestion.source.usage.starburst_trino_usage:TrinoUsageSource", "nifi = datahub.ingestion.source.nifi:NifiSource", diff --git a/metadata-ingestion/src/datahub/ingestion/source/sql/teradata.py b/metadata-ingestion/src/datahub/ingestion/source/sql/teradata.py new file mode 100644 index 00000000000000..dd11cd840bed97 --- /dev/null +++ b/metadata-ingestion/src/datahub/ingestion/source/sql/teradata.py @@ -0,0 +1,228 @@ +import logging +from dataclasses import dataclass +from typing import Iterable, Optional, Set, Union + +# This import verifies that the dependencies are available. +import teradatasqlalchemy # noqa: F401 +import teradatasqlalchemy.types as custom_types +from pydantic.fields import Field +from sqlalchemy import create_engine +from sqlalchemy.engine import Engine + +from datahub.configuration.common import AllowDenyPattern +from datahub.configuration.time_window_config import BaseTimeWindowConfig +from datahub.emitter.sql_parsing_builder import SqlParsingBuilder +from datahub.ingestion.api.common import PipelineContext +from datahub.ingestion.api.decorators import ( + SourceCapability, + SupportStatus, + capability, + config_class, + platform_name, + support_status, +) +from datahub.ingestion.api.workunit import MetadataWorkUnit +from datahub.ingestion.graph.client import DataHubGraph +from datahub.ingestion.source.sql.sql_common import SqlWorkUnit, register_custom_type +from datahub.ingestion.source.sql.sql_generic_profiler import ProfilingSqlReport +from datahub.ingestion.source.sql.two_tier_sql_source import ( + TwoTierSQLAlchemyConfig, + TwoTierSQLAlchemySource, +) +from datahub.ingestion.source.usage.usage_common import BaseUsageConfig +from datahub.ingestion.source_report.ingestion_stage import IngestionStageReport +from datahub.ingestion.source_report.time_window import BaseTimeWindowReport +from datahub.metadata.com.linkedin.pegasus2avro.schema import ( + BytesTypeClass, + TimeTypeClass, +) +from datahub.utilities.sqlglot_lineage import SchemaResolver, sqlglot_lineage + +logger: logging.Logger = logging.getLogger(__name__) + +register_custom_type(custom_types.JSON, BytesTypeClass) +register_custom_type(custom_types.INTERVAL_DAY, TimeTypeClass) +register_custom_type(custom_types.INTERVAL_DAY_TO_SECOND, TimeTypeClass) +register_custom_type(custom_types.INTERVAL_DAY_TO_MINUTE, TimeTypeClass) +register_custom_type(custom_types.INTERVAL_DAY_TO_HOUR, TimeTypeClass) +register_custom_type(custom_types.INTERVAL_SECOND, TimeTypeClass) +register_custom_type(custom_types.INTERVAL_MINUTE, TimeTypeClass) +register_custom_type(custom_types.INTERVAL_MINUTE_TO_SECOND, TimeTypeClass) +register_custom_type(custom_types.INTERVAL_HOUR, TimeTypeClass) +register_custom_type(custom_types.INTERVAL_HOUR_TO_MINUTE, TimeTypeClass) +register_custom_type(custom_types.INTERVAL_HOUR_TO_SECOND, TimeTypeClass) +register_custom_type(custom_types.INTERVAL_MONTH, TimeTypeClass) +register_custom_type(custom_types.INTERVAL_YEAR, TimeTypeClass) +register_custom_type(custom_types.INTERVAL_YEAR_TO_MONTH, TimeTypeClass) +register_custom_type(custom_types.MBB, BytesTypeClass) +register_custom_type(custom_types.MBR, BytesTypeClass) +register_custom_type(custom_types.GEOMETRY, BytesTypeClass) +register_custom_type(custom_types.TDUDT, BytesTypeClass) +register_custom_type(custom_types.XML, BytesTypeClass) + + +@dataclass +class TeradataReport(ProfilingSqlReport, IngestionStageReport, BaseTimeWindowReport): + num_queries_parsed: int = 0 + num_table_parse_failures: int = 0 + + +class BaseTeradataConfig(TwoTierSQLAlchemyConfig): + scheme = Field(default="teradatasql", description="database scheme") + + +class TeradataConfig(BaseTeradataConfig, BaseTimeWindowConfig): + database_pattern = Field( + default=AllowDenyPattern(deny=["dbc"]), + description="Regex patterns for databases to filter in ingestion.", + ) + include_table_lineage = Field( + default=False, + description="Whether to include table lineage in the ingestion. " + "This requires to have the table lineage feature enabled.", + ) + + usage: BaseUsageConfig = Field( + description="The usage config to use when generating usage statistics", + default=BaseUsageConfig(), + ) + + use_schema_resolver: bool = Field( + default=True, + description="Read SchemaMetadata aspects from DataHub to aid in SQL parsing. Turn off only for testing.", + hidden_from_docs=True, + ) + + default_db: Optional[str] = Field( + default=None, + description="The default database to use for unqualified table names", + ) + + include_usage_statistics: bool = Field( + default=False, + description="Generate usage statistic.", + ) + + +@platform_name("Teradata") +@config_class(TeradataConfig) +@support_status(SupportStatus.TESTING) +@capability(SourceCapability.DOMAINS, "Enabled by default") +@capability(SourceCapability.CONTAINERS, "Enabled by default") +@capability(SourceCapability.PLATFORM_INSTANCE, "Enabled by default") +@capability(SourceCapability.DELETION_DETECTION, "Optionally enabled via configuration") +@capability(SourceCapability.DATA_PROFILING, "Optionally enabled via configuration") +@capability(SourceCapability.LINEAGE_COARSE, "Optionally enabled via configuration") +@capability(SourceCapability.LINEAGE_FINE, "Optionally enabled via configuration") +@capability(SourceCapability.USAGE_STATS, "Optionally enabled via configuration") +class TeradataSource(TwoTierSQLAlchemySource): + """ + This plugin extracts the following: + + - Metadata for databases, schemas, views, and tables + - Column types associated with each table + - Table, row, and column statistics via optional SQL profiling + """ + + config: TeradataConfig + + LINEAGE_QUERY: str = """SELECT ProcID, UserName as "user", StartTime AT TIME ZONE 'GMT' as "timestamp", DefaultDatabase as default_database, QueryText as query + FROM "DBC".DBQLogTbl + where ErrorCode = 0 + and QueryText like 'create table demo_user.test_lineage%' + and "timestamp" >= TIMESTAMP '{start_time}' + and "timestamp" < TIMESTAMP '{end_time}' + """ + urns: Optional[Set[str]] + + def __init__(self, config: TeradataConfig, ctx: PipelineContext): + super().__init__(config, ctx, "teradata") + + self.report: TeradataReport = TeradataReport() + self.graph: Optional[DataHubGraph] = ctx.graph + + if self.graph: + if self.config.use_schema_resolver: + self.schema_resolver = ( + self.graph.initialize_schema_resolver_from_datahub( + platform=self.platform, + platform_instance=self.config.platform_instance, + env=self.config.env, + ) + ) + self.urns = self.schema_resolver.get_urns() + else: + self.schema_resolver = self.graph._make_schema_resolver( + platform=self.platform, + platform_instance=self.config.platform_instance, + env=self.config.env, + ) + self.urns = None + else: + self.schema_resolver = SchemaResolver( + platform=self.platform, + platform_instance=self.config.platform_instance, + graph=None, + env=self.config.env, + ) + self.urns = None + + self.builder: SqlParsingBuilder = SqlParsingBuilder( + usage_config=self.config.usage + if self.config.include_usage_statistics + else None, + generate_lineage=self.config.include_table_lineage, + generate_usage_statistics=self.config.include_usage_statistics, + generate_operations=self.config.usage.include_operational_stats, + ) + + @classmethod + def create(cls, config_dict, ctx): + config = TeradataConfig.parse_obj(config_dict) + return cls(config, ctx) + + def get_audit_log_mcps(self) -> Iterable[MetadataWorkUnit]: + engine = self.get_metadata_engine() + for entry in engine.execute( + self.LINEAGE_QUERY.format( + start_time=self.config.start_time, end_time=self.config.end_time + ) + ): + self.report.num_queries_parsed += 1 + if self.report.num_queries_parsed % 1000 == 0: + logger.info(f"Parsed {self.report.num_queries_parsed} queries") + + result = sqlglot_lineage( + sql=entry.query, + schema_resolver=self.schema_resolver, + default_db=None, + default_schema=entry.default_database + if entry.default_database + else self.config.default_db, + ) + if result.debug_info.table_error: + logger.debug( + f"Error parsing table lineage, {result.debug_info.table_error}" + ) + self.report.num_table_parse_failures += 1 + continue + + yield from self.builder.process_sql_parsing_result( + result, + query=entry.query, + query_timestamp=entry.timestamp, + user=f"urn:li:corpuser:{entry.user}", + include_urns=self.urns, + ) + + def get_metadata_engine(self) -> Engine: + url = self.config.get_sql_alchemy_url() + logger.debug(f"sql_alchemy_url={url}") + return create_engine(url, **self.config.options) + + def get_workunits_internal(self) -> Iterable[Union[MetadataWorkUnit, SqlWorkUnit]]: + yield from super().get_workunits_internal() + if self.config.include_table_lineage or self.config.include_usage_statistics: + self.report.report_ingestion_stage_start("audit log extraction") + yield from self.get_audit_log_mcps() + yield from self.builder.gen_workunits() diff --git a/metadata-ingestion/src/datahub/testing/check_sql_parser_result.py b/metadata-ingestion/src/datahub/testing/check_sql_parser_result.py index 8516a7054a9cdc..b3b1331db768bb 100644 --- a/metadata-ingestion/src/datahub/testing/check_sql_parser_result.py +++ b/metadata-ingestion/src/datahub/testing/check_sql_parser_result.py @@ -70,11 +70,14 @@ def assert_sql_result( sql: str, *, dialect: str, + platform_instance: Optional[str] = None, expected_file: pathlib.Path, schemas: Optional[Dict[str, SchemaInfo]] = None, **kwargs: Any, ) -> None: - schema_resolver = SchemaResolver(platform=dialect) + schema_resolver = SchemaResolver( + platform=dialect, platform_instance=platform_instance + ) if schemas: for urn, schema in schemas.items(): schema_resolver.add_raw_schema_info(urn, schema) diff --git a/metadata-ingestion/src/datahub/utilities/sqlglot_lineage.py b/metadata-ingestion/src/datahub/utilities/sqlglot_lineage.py index 349eb40a5e865a..c830ec8c02fd44 100644 --- a/metadata-ingestion/src/datahub/utilities/sqlglot_lineage.py +++ b/metadata-ingestion/src/datahub/utilities/sqlglot_lineage.py @@ -482,6 +482,11 @@ def _column_level_lineage( # noqa: C901 # Our snowflake source lowercases column identifiers, so we are forced # to do fuzzy (case-insensitive) resolution instead of exact resolution. "snowflake", + # Teradata column names are case-insensitive. + # A name, even when enclosed in double quotation marks, is not case sensitive. For example, CUSTOMER and Customer are the same. + # See more below: + # https://documentation.sas.com/doc/en/pgmsascdc/9.4_3.5/acreldb/n0ejgx4895bofnn14rlguktfx5r3.htm + "teradata", } sqlglot_db_schema = sqlglot.MappingSchema( diff --git a/metadata-ingestion/tests/unit/sql_parsing/goldens/test_teradata_default_normalization.json b/metadata-ingestion/tests/unit/sql_parsing/goldens/test_teradata_default_normalization.json new file mode 100644 index 00000000000000..b0351a7e07ad2d --- /dev/null +++ b/metadata-ingestion/tests/unit/sql_parsing/goldens/test_teradata_default_normalization.json @@ -0,0 +1,38 @@ +{ + "query_type": "CREATE", + "in_tables": [ + "urn:li:dataset:(urn:li:dataPlatform:teradata,myteradata.demo_user.pima_patient_diagnoses,PROD)", + "urn:li:dataset:(urn:li:dataPlatform:teradata,myteradata.demo_user.pima_patient_features,PROD)" + ], + "out_tables": [ + "urn:li:dataset:(urn:li:dataPlatform:teradata,myteradata.demo_user.test_lineage2,PROD)" + ], + "column_lineage": [ + { + "downstream": { + "table": "urn:li:dataset:(urn:li:dataPlatform:teradata,myteradata.demo_user.test_lineage2,PROD)", + "column": "PatientId", + "native_column_type": "INTEGER()" + }, + "upstreams": [ + { + "table": "urn:li:dataset:(urn:li:dataPlatform:teradata,myteradata.demo_user.pima_patient_diagnoses,PROD)", + "column": "PatientId" + } + ] + }, + { + "downstream": { + "table": "urn:li:dataset:(urn:li:dataPlatform:teradata,myteradata.demo_user.test_lineage2,PROD)", + "column": "BMI", + "native_column_type": "FLOAT()" + }, + "upstreams": [ + { + "table": "urn:li:dataset:(urn:li:dataPlatform:teradata,myteradata.demo_user.pima_patient_features,PROD)", + "column": "BMI" + } + ] + } + ] +} \ No newline at end of file diff --git a/metadata-ingestion/tests/unit/sql_parsing/test_sqlglot_lineage.py b/metadata-ingestion/tests/unit/sql_parsing/test_sqlglot_lineage.py index bb6e5f15817547..059add8db67e48 100644 --- a/metadata-ingestion/tests/unit/sql_parsing/test_sqlglot_lineage.py +++ b/metadata-ingestion/tests/unit/sql_parsing/test_sqlglot_lineage.py @@ -630,3 +630,45 @@ def test_snowflake_column_cast(): # TODO: Add a test for setting platform_instance or env + + +def test_teradata_default_normalization(): + assert_sql_result( + """ +create table demo_user.test_lineage2 as + ( + select + ppd.PatientId, + ppf.bmi + from + demo_user.pima_patient_features ppf + join demo_user.pima_patient_diagnoses ppd on + ppd.PatientId = ppf.PatientId + ) with data; +""", + dialect="teradata", + default_schema="dbc", + platform_instance="myteradata", + schemas={ + "urn:li:dataset:(urn:li:dataPlatform:teradata,myteradata.demo_user.pima_patient_diagnoses,PROD)": { + "HasDiabetes": "INTEGER()", + "PatientId": "INTEGER()", + }, + "urn:li:dataset:(urn:li:dataPlatform:teradata,myteradata.demo_user.pima_patient_features,PROD)": { + "Age": "INTEGER()", + "BMI": "FLOAT()", + "BloodP": "INTEGER()", + "DiPedFunc": "FLOAT()", + "NumTimesPrg": "INTEGER()", + "PatientId": "INTEGER()", + "PlGlcConc": "INTEGER()", + "SkinThick": "INTEGER()", + "TwoHourSerIns": "INTEGER()", + }, + "urn:li:dataset:(urn:li:dataPlatform:teradata,myteradata.demo_user.test_lineage2,PROD)": { + "BMI": "FLOAT()", + "PatientId": "INTEGER()", + }, + }, + expected_file=RESOURCE_DIR / "test_teradata_default_normalization.json", + )