diff --git a/datahub-web-react/src/app/entity/shared/EntityDropdown/UpdateDeprecationModal.tsx b/datahub-web-react/src/app/entity/shared/EntityDropdown/UpdateDeprecationModal.tsx index 512735e60b2c3f..6ae893e12575fd 100644 --- a/datahub-web-react/src/app/entity/shared/EntityDropdown/UpdateDeprecationModal.tsx +++ b/datahub-web-react/src/app/entity/shared/EntityDropdown/UpdateDeprecationModal.tsx @@ -27,7 +27,7 @@ export const UpdateDeprecationModal = ({ urns, onClose, refetch }: Props) => { resources: [...urns.map((urn) => ({ resourceUrn: urn }))], deprecated: true, note: formData.note, - decommissionTime: formData.decommissionTime && formData.decommissionTime.unix(), + decommissionTime: formData.decommissionTime && formData.decommissionTime.unix() * 1000, }, }, }); diff --git a/datahub-web-react/src/app/entity/shared/components/styled/DeprecationPill.tsx b/datahub-web-react/src/app/entity/shared/components/styled/DeprecationPill.tsx index ffc32c15382597..f60a74247ebcc2 100644 --- a/datahub-web-react/src/app/entity/shared/components/styled/DeprecationPill.tsx +++ b/datahub-web-react/src/app/entity/shared/components/styled/DeprecationPill.tsx @@ -83,15 +83,24 @@ export const DeprecationPill = ({ deprecation, urn, refetch, showUndeprecate }: * Deprecation Decommission Timestamp */ const localeTimezone = getLocaleTimezone(); + + let decommissionTimeSeconds; + if (deprecation.decommissionTime) { + if (deprecation.decommissionTime < 943920000000) { + // Time is set in way past if it was milli-second so considering this as set in seconds + decommissionTimeSeconds = deprecation.decommissionTime; + } else { + decommissionTimeSeconds = deprecation.decommissionTime / 1000; + } + } const decommissionTimeLocal = - (deprecation.decommissionTime && + (decommissionTimeSeconds && `Scheduled to be decommissioned on ${moment - .unix(deprecation.decommissionTime) + .unix(decommissionTimeSeconds) .format('DD/MMM/YYYY')} (${localeTimezone})`) || undefined; const decommissionTimeGMT = - deprecation.decommissionTime && - moment.unix(deprecation.decommissionTime).utc().format('dddd, DD/MMM/YYYY HH:mm:ss z'); + decommissionTimeSeconds && moment.unix(decommissionTimeSeconds).utc().format('dddd, DD/MMM/YYYY HH:mm:ss z'); const hasDetails = deprecation.note !== '' || deprecation.decommissionTime !== null; const isDividerNeeded = deprecation.note !== '' && deprecation.decommissionTime !== null; diff --git a/datahub-web-react/src/app/ingest/source/builder/RecipeForm/trino.ts b/datahub-web-react/src/app/ingest/source/builder/RecipeForm/trino.ts index ed3c7ee73b8195..1af84c0131d4a7 100644 --- a/datahub-web-react/src/app/ingest/source/builder/RecipeForm/trino.ts +++ b/datahub-web-react/src/app/ingest/source/builder/RecipeForm/trino.ts @@ -6,7 +6,7 @@ export const TRINO_HOST_PORT: RecipeField = { name: 'host_port', label: 'Host and Port', tooltip: - "The host and port where Trino is running. For example, 'trino-server:5432'. Note: this host must be accessible on the network where DataHub is running (or allowed via an IP Allow List, AWS PrivateLink, etc).", + "The host (without protocol and ://) and port where Trino is running. For example, 'trino-server:5432'. Note: this host must be accessible on the network where DataHub is running (or allowed via an IP Allow List, AWS PrivateLink, etc).", type: FieldType.TEXT, fieldPath: 'source.config.host_port', placeholder: 'trino-server:5432', diff --git a/datahub-web-react/src/app/settings/SettingsPage.tsx b/datahub-web-react/src/app/settings/SettingsPage.tsx index 69d4eb2b10b4d5..e0a15c73a626d0 100644 --- a/datahub-web-react/src/app/settings/SettingsPage.tsx +++ b/datahub-web-react/src/app/settings/SettingsPage.tsx @@ -25,11 +25,11 @@ import ManagePosts from './posts/ManagePosts'; const PageContainer = styled.div` display: flex; overflow: auto; + flex: 1; `; const SettingsBarContainer = styled.div` padding-top: 20px; - max-height: 100vh; border-right: 1px solid ${ANTD_GRAY[5]}; display: flex; flex-direction: column; diff --git a/metadata-ingestion/src/datahub/emitter/sql_parsing_builder.py b/metadata-ingestion/src/datahub/emitter/sql_parsing_builder.py index cedaa4fbbd7f6f..ea5ebf705707a1 100644 --- a/metadata-ingestion/src/datahub/emitter/sql_parsing_builder.py +++ b/metadata-ingestion/src/datahub/emitter/sql_parsing_builder.py @@ -195,6 +195,9 @@ def _gen_lineage_mcps(self) -> Iterable[MetadataChangeProposalWrapper]: upstreams.append(edge.gen_upstream_aspect()) fine_upstreams.extend(edge.gen_fine_grained_lineage_aspects()) + if not upstreams: + continue + upstream_lineage = UpstreamLineageClass( upstreams=sorted(upstreams, key=lambda x: x.dataset), fineGrainedLineages=sorted( diff --git a/metadata-ingestion/src/datahub/ingestion/source/dbt/dbt_common.py b/metadata-ingestion/src/datahub/ingestion/source/dbt/dbt_common.py index 94df0a4f8a166e..919ba5a4b285a9 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/dbt/dbt_common.py +++ b/metadata-ingestion/src/datahub/ingestion/source/dbt/dbt_common.py @@ -895,6 +895,7 @@ def _infer_schemas_and_update_cll(self, all_nodes_map: Dict[str, DBTNode]) -> No (upstream, node.dbt_name) for node in all_nodes_map.values() for upstream in node.upstream_nodes + if upstream in all_nodes_map ), ): node = all_nodes_map[dbt_name] diff --git a/metadata-ingestion/src/datahub/ingestion/source/looker/lookml_source.py b/metadata-ingestion/src/datahub/ingestion/source/looker/lookml_source.py index 9d7c9726127779..2bd469b3f9bcdd 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/looker/lookml_source.py +++ b/metadata-ingestion/src/datahub/ingestion/source/looker/lookml_source.py @@ -1982,9 +1982,16 @@ def get_internal_workunits(self) -> Iterable[MetadataWorkUnit]: # noqa: C901 self.reporter, ) - # some views can be mentioned by multiple 'include' statements and can be included via different connections. - # So this set is used to prevent creating duplicate events + # Some views can be mentioned by multiple 'include' statements and can be included via different connections. + + # This map is used to keep track of which views files have already been processed + # for a connection in order to prevent creating duplicate events. + # Key: connection name, Value: view file paths processed_view_map: Dict[str, Set[str]] = {} + + # This map is used to keep track of the connection that a view is processed with. + # Key: view unique identifier - determined by variables present in config `view_naming_pattern` + # Value: Tuple(model file name, connection name) view_connection_map: Dict[str, Tuple[str, str]] = {} # The ** means "this directory and all subdirectories", and hence should @@ -2148,13 +2155,17 @@ def get_internal_workunits(self) -> Iterable[MetadataWorkUnit]: # noqa: C901 if self.source_config.view_pattern.allowed( maybe_looker_view.id.view_name ): + view_urn = maybe_looker_view.id.get_urn( + self.source_config + ) view_connection_mapping = view_connection_map.get( - maybe_looker_view.id.view_name + view_urn ) if not view_connection_mapping: - view_connection_map[ - maybe_looker_view.id.view_name - ] = (model_name, model.connection) + view_connection_map[view_urn] = ( + model_name, + model.connection, + ) # first time we are discovering this view logger.debug( f"Generating MCP for view {raw_view['name']}" diff --git a/metadata-ingestion/src/datahub/ingestion/source/sql/teradata.py b/metadata-ingestion/src/datahub/ingestion/source/sql/teradata.py index 8aeb1e50cd0b30..4ea8dbe236c53e 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/sql/teradata.py +++ b/metadata-ingestion/src/datahub/ingestion/source/sql/teradata.py @@ -1,14 +1,32 @@ import logging +from collections import defaultdict from dataclasses import dataclass from datetime import datetime -from typing import Iterable, Optional, Union +from functools import lru_cache +from itertools import groupby +from typing import ( + Any, + Dict, + Iterable, + List, + MutableMapping, + Optional, + Set, + Tuple, + Union, +) # This import verifies that the dependencies are available. import teradatasqlalchemy # noqa: F401 import teradatasqlalchemy.types as custom_types from pydantic.fields import Field -from sqlalchemy import create_engine +from sqlalchemy import create_engine, inspect from sqlalchemy.engine import Engine +from sqlalchemy.engine.base import Connection +from sqlalchemy.engine.reflection import Inspector +from sqlalchemy.sql.expression import text +from teradatasqlalchemy.dialect import TeradataDialect +from teradatasqlalchemy.options import configure from datahub.configuration.common import AllowDenyPattern from datahub.configuration.time_window_config import BaseTimeWindowConfig @@ -22,9 +40,11 @@ platform_name, support_status, ) +from datahub.ingestion.api.source_helpers import auto_lowercase_urns from datahub.ingestion.api.workunit import MetadataWorkUnit from datahub.ingestion.graph.client import DataHubGraph from datahub.ingestion.source.sql.sql_common import SqlWorkUnit, register_custom_type +from datahub.ingestion.source.sql.sql_config import SQLCommonConfig from datahub.ingestion.source.sql.sql_generic_profiler import ProfilingSqlReport from datahub.ingestion.source.sql.two_tier_sql_source import ( TwoTierSQLAlchemyConfig, @@ -33,6 +53,7 @@ from datahub.ingestion.source.usage.usage_common import BaseUsageConfig from datahub.ingestion.source_report.ingestion_stage import IngestionStageReport from datahub.ingestion.source_report.time_window import BaseTimeWindowReport +from datahub.metadata._schema_classes import SchemaMetadataClass from datahub.metadata.com.linkedin.pegasus2avro.schema import ( BytesTypeClass, TimeTypeClass, @@ -62,6 +83,249 @@ register_custom_type(custom_types.XML, BytesTypeClass) +@dataclass +class TeradataTable: + database: str + name: str + description: Optional[str] + object_type: str + create_timestamp: datetime + last_alter_name: Optional[str] + last_alter_timestamp: Optional[datetime] + request_text: Optional[str] + + +# lru cache is set to 1 which work only in single threaded environment but it keeps the memory footprint lower +@lru_cache(maxsize=1) +def get_schema_columns( + self: Any, connection: Connection, dbc_columns: str, schema: str +) -> Dict[str, List[Any]]: + columns: Dict[str, List[Any]] = {} + columns_query = f"select * from dbc.{dbc_columns} where DatabaseName (NOT CASESPECIFIC) = '{schema}' (NOT CASESPECIFIC) order by TableName, ColumnId" + rows = connection.execute(text(columns_query)).fetchall() + for row in rows: + row_mapping = row._mapping + if row_mapping.TableName not in columns: + columns[row_mapping.TableName] = [] + + columns[row_mapping.TableName].append(row_mapping) + + return columns + + +# lru cache is set to 1 which work only in single threaded environment but it keeps the memory footprint lower +@lru_cache(maxsize=1) +def get_schema_pk_constraints( + self: Any, connection: Connection, schema: str +) -> Dict[str, List[Any]]: + dbc_indices = "IndicesV" + "X" if configure.usexviews else "IndicesV" + primary_keys: Dict[str, List[Any]] = {} + stmt = f"select * from dbc.{dbc_indices} where DatabaseName (NOT CASESPECIFIC) = '{schema}' (NOT CASESPECIFIC) and IndexType = 'K' order by IndexNumber" + rows = connection.execute(text(stmt)).fetchall() + for row in rows: + row_mapping = row._mapping + if row_mapping.TableName not in primary_keys: + primary_keys[row_mapping.TableName] = [] + + primary_keys[row_mapping.TableName].append(row_mapping) + + return primary_keys + + +def optimized_get_pk_constraint( + self: Any, + connection: Connection, + table_name: str, + schema: Optional[str] = None, + **kw: Dict[str, Any], +) -> Dict: + """ + Override + TODO: Check if we need PRIMARY Indices or PRIMARY KEY Indices + TODO: Check for border cases (No PK Indices) + """ + + if schema is None: + schema = self.default_schema_name + + # Default value for 'usexviews' is False so use dbc.IndicesV by default + # dbc_indices = self.__get_xviews_obj("IndicesV") + + # table_obj = table( + # dbc_indices, column("ColumnName"), column("IndexName"), schema="dbc" + # ) + + res = [] + pk_keys = self.get_schema_pk_constraints(connection, schema) + res = pk_keys.get(table_name, []) + + index_columns = list() + index_name = None + + for index_column in res: + index_columns.append(self.normalize_name(index_column.ColumnName)) + index_name = self.normalize_name( + index_column.IndexName + ) # There should be just one IndexName + + return {"constrained_columns": index_columns, "name": index_name} + + +def optimized_get_columns( + self: Any, + connection: Connection, + table_name: str, + schema: Optional[str] = None, + tables_cache: MutableMapping[str, List[TeradataTable]] = {}, + use_qvci: bool = False, + **kw: Dict[str, Any], +) -> List[Dict]: + if schema is None: + schema = self.default_schema_name + + # Using 'help schema.table.*' statements has been considered. + # The DBC.ColumnsV provides the default value which is not available + # with the 'help column' commands result. + + td_table: Optional[TeradataTable] = None + # Check if the object is a view + for t in tables_cache[schema]: + if t.name == table_name: + td_table = t + break + + if td_table is None: + logger.warning( + f"Table {table_name} not found in cache for schema {schema}, not getting columns" + ) + return [] + + res = [] + if td_table.object_type == "View" and not use_qvci: + # Volatile table definition is not stored in the dictionary. + # We use the 'help schema.table.*' command instead to get information for all columns. + # We have to do the same for views since we need the type information + # which is not available in dbc.ColumnsV. + res = self._get_column_help(connection, schema, table_name, column_name=None) + + # If this is a view, get types for individual columns (dbc.ColumnsV won't have types for view columns). + # For a view or a volatile table, we have to set the default values as the 'help' command does not have it. + col_info_list = [] + for r in res: + updated_column_info_dict = self._update_column_help_info(r._mapping) + col_info_list.append(dict(r._mapping, **(updated_column_info_dict))) + res = col_info_list + else: + # Default value for 'usexviews' is False so use dbc.ColumnsV by default + dbc_columns = "columnsQV" if use_qvci else "columnsV" + dbc_columns = dbc_columns + "X" if configure.usexviews else dbc_columns + res = self.get_schema_columns(connection, dbc_columns, schema).get( + table_name, [] + ) + + final_column_info = [] + # Don't care about ART tables now + # Ignore the non-functional column in a PTI table + for row in res: + col_info = self._get_column_info(row) + if "TSColumnType" in col_info and col_info["TSColumnType"] is not None: + if ( + col_info["ColumnName"] == "TD_TIMEBUCKET" + and col_info["TSColumnType"].strip() == "TB" + ): + continue + final_column_info.append(col_info) + + return final_column_info + + +# lru cache is set to 1 which work only in single threaded environment but it keeps the memory footprint lower +@lru_cache(maxsize=1) +def get_schema_foreign_keys( + self: Any, connection: Connection, schema: str +) -> Dict[str, List[Any]]: + dbc_child_parent_table = ( + "All_RI_ChildrenV" + "X" if configure.usexviews else "All_RI_ChildrenV" + ) + foreign_keys: Dict[str, List[Any]] = {} + stmt = f""" + SELECT dbc."All_RI_ChildrenV"."ChildDB", dbc."All_RI_ChildrenV"."ChildTable", dbc."All_RI_ChildrenV"."IndexID", dbc."{dbc_child_parent_table}"."IndexName", dbc."{dbc_child_parent_table}"."ChildKeyColumn", dbc."{dbc_child_parent_table}"."ParentDB", dbc."{dbc_child_parent_table}"."ParentTable", dbc."{dbc_child_parent_table}"."ParentKeyColumn" + FROM dbc."{dbc_child_parent_table}" + WHERE ChildDB = '{schema}' ORDER BY "IndexID" ASC + """ + rows = connection.execute(text(stmt)).fetchall() + for row in rows: + row_mapping = row._mapping + if row_mapping.ChildTable not in foreign_keys: + foreign_keys[row_mapping.ChildTable] = [] + + foreign_keys[row_mapping.ChildTable].append(row_mapping) + + return foreign_keys + + +def optimized_get_foreign_keys(self, connection, table_name, schema=None, **kw): + """ + Overrides base class method + """ + + if schema is None: + schema = self.default_schema_name + # Default value for 'usexviews' is False so use DBC.All_RI_ChildrenV by default + res = self.get_schema_foreign_keys(connection, schema).get(table_name, []) + + def grouper(fk_row): + return { + "name": fk_row.IndexName or fk_row.IndexID, # ID if IndexName is None + "schema": fk_row.ParentDB, + "table": fk_row.ParentTable, + } + + # TODO: Check if there's a better way + fk_dicts = list() + for constraint_info, constraint_cols in groupby(res, grouper): + fk_dict = { + "name": str(constraint_info["name"]), + "constrained_columns": list(), + "referred_table": constraint_info["table"], + "referred_schema": constraint_info["schema"], + "referred_columns": list(), + } + + for constraint_col in constraint_cols: + fk_dict["constrained_columns"].append( + self.normalize_name(constraint_col.ChildKeyColumn) + ) + fk_dict["referred_columns"].append( + self.normalize_name(constraint_col.ParentKeyColumn) + ) + + fk_dicts.append(fk_dict) + + return fk_dicts + + +def optimized_get_view_definition( + self: Any, + connection: Connection, + view_name: str, + schema: Optional[str] = None, + tables_cache: MutableMapping[str, List[TeradataTable]] = {}, + **kw: Dict[str, Any], +) -> Optional[str]: + if schema is None: + schema = self.default_schema_name + + if schema not in tables_cache: + return None + + for table in tables_cache[schema]: + if table.name == view_name: + return self.normalize_name(table.request_text) + + return None + + @dataclass class TeradataReport(ProfilingSqlReport, IngestionStageReport, BaseTimeWindowReport): num_queries_parsed: int = 0 @@ -74,8 +338,62 @@ class BaseTeradataConfig(TwoTierSQLAlchemyConfig): class TeradataConfig(BaseTeradataConfig, BaseTimeWindowConfig): + databases: Optional[List[str]] = Field( + default=None, + description=( + "List of databases to ingest. If not specified, all databases will be ingested." + " Even if this is specified, databases will still be filtered by `database_pattern`." + ), + ) + database_pattern = Field( - default=AllowDenyPattern(deny=["dbc"]), + default=AllowDenyPattern( + deny=[ + "All", + "Crashdumps", + "Default", + "DemoNow_Monitor", + "EXTUSER", + "External_AP", + "GLOBAL_FUNCTIONS", + "LockLogShredder", + "PUBLIC", + "SQLJ", + "SYSBAR", + "SYSJDBC", + "SYSLIB", + "SYSSPATIAL", + "SYSUDTLIB", + "SYSUIF", + "SysAdmin", + "Sys_Calendar", + "SystemFe", + "TDBCMgmt", + "TDMaps", + "TDPUSER", + "TDQCD", + "TDStats", + "TD_ANALYTICS_DB", + "TD_SERVER_DB", + "TD_SYSFNLIB", + "TD_SYSGPL", + "TD_SYSXML", + "TDaaS_BAR", + "TDaaS_DB", + "TDaaS_Maint", + "TDaaS_Monitor", + "TDaaS_Support", + "TDaaS_TDBCMgmt1", + "TDaaS_TDBCMgmt2", + "dbcmngr", + "mldb", + "system", + "tapidb", + "tdwm", + "val", + "dbc", + ] + ), description="Regex patterns for databases to filter in ingestion.", ) include_table_lineage = Field( @@ -84,6 +402,11 @@ class TeradataConfig(BaseTeradataConfig, BaseTimeWindowConfig): "This requires to have the table lineage feature enabled.", ) + include_view_lineage = Field( + default=True, + description="Whether to include view lineage in the ingestion. " + "This requires to have the view lineage feature enabled.", + ) usage: BaseUsageConfig = Field( description="The usage config to use when generating usage statistics", default=BaseUsageConfig(), @@ -99,6 +422,16 @@ class TeradataConfig(BaseTeradataConfig, BaseTimeWindowConfig): description="Generate usage statistic.", ) + use_file_backed_cache: bool = Field( + default=True, + description="Whether to use a file backed cache for the view definitions.", + ) + + use_qvci: bool = Field( + default=False, + description="Whether to use QVCI to get column information. This is faster but requires to have QVCI enabled.", + ) + @platform_name("Teradata") @config_class(TeradataConfig) @@ -122,13 +455,116 @@ class TeradataSource(TwoTierSQLAlchemySource): config: TeradataConfig - LINEAGE_QUERY: str = """SELECT ProcID, UserName as "user", StartTime AT TIME ZONE 'GMT' as "timestamp", DefaultDatabase as default_database, QueryText as query - FROM "DBC".DBQLogTbl - where ErrorCode = 0 - and QueryText like 'create table demo_user.test_lineage%' - and "timestamp" >= TIMESTAMP '{start_time}' - and "timestamp" < TIMESTAMP '{end_time}' - """ + LINEAGE_QUERY_DATABASE_FILTER: str = """and default_database IN ({databases})""" + + LINEAGE_TIMESTAMP_BOUND_QUERY: str = """ + SELECT MIN(CollectTimeStamp) as "min_ts", MAX(CollectTimeStamp) as "max_ts" from DBC.DBQLogTbl + """.strip() + + QUERY_TEXT_QUERY: str = """ + SELECT + s.QueryID as "query_id", + UserName as "user", + StartTime AT TIME ZONE 'GMT' as "timestamp", + DefaultDatabase as default_database, + s.SqlTextInfo as "query_text", + s.SqlRowNo as "row_no" + FROM "DBC".DBQLogTbl as l + JOIN "DBC".DBQLSqlTbl as s on s.QueryID = l.QueryID + WHERE + l.ErrorCode = 0 + AND l.statementtype not in ( + 'Unrecognized type', + 'Create Database/User', + 'Help', + 'Modify Database', + 'Drop Table', + 'Show', + 'Not Applicable', + 'Grant', + 'Abort', + 'Database', + 'Flush Query Logging', + 'Null', + 'Begin/End DBQL', + 'Revoke' + ) + and "timestamp" >= TIMESTAMP '{start_time}' + and "timestamp" < TIMESTAMP '{end_time}' + and s.CollectTimeStamp >= TIMESTAMP '{start_time}' + and default_database not in ('DEMONOW_MONITOR') + {databases_filter} + ORDER BY "query_id", "row_no" + """.strip() + + TABLES_AND_VIEWS_QUERY: str = """ +SELECT + t.DatabaseName, + t.TableName as name, + t.CommentString as description, + CASE t.TableKind + WHEN 'I' THEN 'Join index' + WHEN 'N' THEN 'Hash index' + WHEN 'T' THEN 'Table' + WHEN 'V' THEN 'View' + WHEN 'O' THEN 'NoPI Table' + WHEN 'Q' THEN 'Queue table' + END AS object_type, + t.CreateTimeStamp, + t.LastAlterName, + t.LastAlterTimeStamp, + t.RequestText +FROM dbc.Tables t +WHERE DatabaseName NOT IN ( + 'All', + 'Crashdumps', + 'Default', + 'DemoNow_Monitor', + 'EXTUSER', + 'External_AP', + 'GLOBAL_FUNCTIONS', + 'LockLogShredder', + 'PUBLIC', + 'SQLJ', + 'SYSBAR', + 'SYSJDBC', + 'SYSLIB', + 'SYSSPATIAL', + 'SYSUDTLIB', + 'SYSUIF', + 'SysAdmin', + 'Sys_Calendar', + 'SystemFe', + 'TDBCMgmt', + 'TDMaps', + 'TDPUSER', + 'TDQCD', + 'TDStats', + 'TD_ANALYTICS_DB', + 'TD_SERVER_DB', + 'TD_SYSFNLIB', + 'TD_SYSGPL', + 'TD_SYSXML', + 'TDaaS_BAR', + 'TDaaS_DB', + 'TDaaS_Maint', + 'TDaaS_Monitor', + 'TDaaS_Support', + 'TDaaS_TDBCMgmt1', + 'TDaaS_TDBCMgmt2', + 'dbcmngr', + 'mldb', + 'system', + 'tapidb', + 'tdwm', + 'val', + 'dbc' +) +AND t.TableKind in ('T', 'V', 'Q', 'O') +ORDER by DatabaseName, TableName; + """.strip() + + _tables_cache: MutableMapping[str, List[TeradataTable]] = defaultdict(list) def __init__(self, config: TeradataConfig, ctx: PipelineContext): super().__init__(config, ctx, "teradata") @@ -145,36 +581,246 @@ def __init__(self, config: TeradataConfig, ctx: PipelineContext): generate_operations=self.config.usage.include_operational_stats, ) - self.schema_resolver = SchemaResolver( - platform=self.platform, - platform_instance=self.config.platform_instance, - graph=None, - env=self.config.env, - ) + self.schema_resolver = self._init_schema_resolver() + + if self.config.include_tables or self.config.include_views: + self.cache_tables_and_views() + logger.info(f"Found {len(self._tables_cache)} tables and views") + setattr(self, "loop_tables", self.cached_loop_tables) # noqa: B010 + setattr(self, "loop_views", self.cached_loop_views) # noqa: B010 + setattr( # noqa: B010 + self, "get_table_properties", self.cached_get_table_properties + ) + + tables_cache = self._tables_cache + setattr( # noqa: B010 + TeradataDialect, + "get_columns", + lambda self, connection, table_name, schema=None, use_qvci=self.config.use_qvci, **kw: optimized_get_columns( + self, + connection, + table_name, + schema, + tables_cache=tables_cache, + use_qvci=use_qvci, + **kw, + ), + ) + + setattr( # noqa: B010 + TeradataDialect, + "get_pk_constraint", + lambda self, connection, table_name, schema=None, **kw: optimized_get_pk_constraint( + self, connection, table_name, schema, **kw + ), + ) + + setattr( # noqa: B010 + TeradataDialect, + "get_foreign_keys", + lambda self, connection, table_name, schema=None, **kw: optimized_get_foreign_keys( + self, connection, table_name, schema, **kw + ), + ) + + setattr( # noqa: B010 + TeradataDialect, + "get_schema_columns", + lambda self, connection, dbc_columns, schema: get_schema_columns( + self, connection, dbc_columns, schema + ), + ) + + setattr( # noqa: B010 + TeradataDialect, + "get_view_definition", + lambda self, connection, view_name, schema=None, **kw: optimized_get_view_definition( + self, connection, view_name, schema, tables_cache=tables_cache, **kw + ), + ) + + setattr( # noqa: B010 + TeradataDialect, + "get_schema_pk_constraints", + lambda self, connection, schema: get_schema_pk_constraints( + self, connection, schema + ), + ) + + setattr( # noqa: B010 + TeradataDialect, + "get_schema_foreign_keys", + lambda self, connection, schema: get_schema_foreign_keys( + self, connection, schema + ), + ) + else: + logger.info( + "Disabling stale entity removal as tables and views are disabled" + ) + if self.config.stateful_ingestion: + self.config.stateful_ingestion.remove_stale_metadata = False @classmethod def create(cls, config_dict, ctx): config = TeradataConfig.parse_obj(config_dict) return cls(config, ctx) - def get_audit_log_mcps(self) -> Iterable[MetadataWorkUnit]: + def _init_schema_resolver(self) -> SchemaResolver: + if not self.config.include_tables or not self.config.include_views: + if self.ctx.graph: + return self.ctx.graph.initialize_schema_resolver_from_datahub( + platform=self.platform, + platform_instance=self.config.platform_instance, + env=self.config.env, + ) + else: + logger.warning( + "Failed to load schema info from DataHub as DataHubGraph is missing.", + ) + return SchemaResolver( + platform=self.platform, + platform_instance=self.config.platform_instance, + env=self.config.env, + ) + + def get_inspectors(self): + # This method can be overridden in the case that you want to dynamically + # run on multiple databases. + url = self.config.get_sql_alchemy_url() + logger.debug(f"sql_alchemy_url={url}") + engine = create_engine(url, **self.config.options) + with engine.connect() as conn: + inspector = inspect(conn) + if self.config.database and self.config.database != "": + databases = [self.config.database] + elif self.config.databases: + databases = self.config.databases + else: + databases = inspector.get_schema_names() + for db in databases: + if self.config.database_pattern.allowed(db): + # url = self.config.get_sql_alchemy_url(current_db=db) + # with create_engine(url, **self.config.options).connect() as conn: + # inspector = inspect(conn) + inspector._datahub_database = db + yield inspector + + def get_db_name(self, inspector: Inspector) -> str: + if hasattr(inspector, "_datahub_database"): + return inspector._datahub_database + + engine = inspector.engine + + if engine and hasattr(engine, "url") and hasattr(engine.url, "database"): + return str(engine.url.database).strip('"') + else: + raise Exception("Unable to get database name from Sqlalchemy inspector") + + def cached_loop_tables( # noqa: C901 + self, + inspector: Inspector, + schema: str, + sql_config: SQLCommonConfig, + ) -> Iterable[Union[SqlWorkUnit, MetadataWorkUnit]]: + setattr( # noqa: B010 + inspector, + "get_table_names", + lambda schema: [ + i.name + for i in filter( + lambda t: t.object_type != "View", self._tables_cache[schema] + ) + ], + ) + yield from super().loop_tables(inspector, schema, sql_config) + + def cached_get_table_properties( + self, inspector: Inspector, schema: str, table: str + ) -> Tuple[Optional[str], Dict[str, str], Optional[str]]: + description: Optional[str] = None + properties: Dict[str, str] = {} + + # The location cannot be fetched generically, but subclasses may override + # this method and provide a location. + location: Optional[str] = None + + for entry in self._tables_cache[schema]: + if entry.name == table: + description = entry.description + if entry.object_type == "View" and entry.request_text: + properties["view_definition"] = entry.request_text + break + return description, properties, location + + def cached_loop_views( # noqa: C901 + self, + inspector: Inspector, + schema: str, + sql_config: SQLCommonConfig, + ) -> Iterable[Union[SqlWorkUnit, MetadataWorkUnit]]: + setattr( # noqa: B010 + inspector, + "get_view_names", + lambda schema: [ + i.name + for i in filter( + lambda t: t.object_type == "View", self._tables_cache[schema] + ) + ], + ) + yield from super().loop_views(inspector, schema, sql_config) + + def cache_tables_and_views(self) -> None: engine = self.get_metadata_engine() - for entry in engine.execute( - self.LINEAGE_QUERY.format( - start_time=self.config.start_time, end_time=self.config.end_time + for entry in engine.execute(self.TABLES_AND_VIEWS_QUERY): + table = TeradataTable( + database=entry.DatabaseName.strip(), + name=entry.name.strip(), + description=entry.description.strip() if entry.description else None, + object_type=entry.object_type, + create_timestamp=entry.CreateTimeStamp, + last_alter_name=entry.LastAlterName, + last_alter_timestamp=entry.LastAlterTimeStamp, + request_text=entry.RequestText.strip() + if entry.object_type == "View" and entry.RequestText + else None, ) - ): + if table.database not in self._tables_cache: + self._tables_cache[table.database] = [] + + self._tables_cache[table.database].append(table) + + def get_audit_log_mcps(self, urns: Set[str]) -> Iterable[MetadataWorkUnit]: + engine = self.get_metadata_engine() + for entry in engine.execute(self._make_lineage_query()): self.report.num_queries_parsed += 1 if self.report.num_queries_parsed % 1000 == 0: logger.info(f"Parsed {self.report.num_queries_parsed} queries") yield from self.gen_lineage_from_query( - query=entry.query, + query=entry.query_text, default_database=entry.default_database, timestamp=entry.timestamp, user=entry.user, - is_view_ddl=False, + urns=urns, + ) + + def _make_lineage_query(self) -> str: + databases_filter = ( + "" + if not self.config.databases + else "and default_database in ({databases})".format( + databases=",".join([f"'{db}'" for db in self.config.databases]) ) + ) + + query = self.QUERY_TEXT_QUERY.format( + start_time=self.config.start_time, + end_time=self.config.end_time, + databases_filter=databases_filter, + ) + return query def gen_lineage_from_query( self, @@ -182,10 +828,12 @@ def gen_lineage_from_query( default_database: Optional[str] = None, timestamp: Optional[datetime] = None, user: Optional[str] = None, - is_view_ddl: bool = False, + view_urn: Optional[str] = None, + urns: Optional[Set[str]] = None, ) -> Iterable[MetadataWorkUnit]: result = sqlglot_lineage( - sql=query, + # With this clever hack we can make the query parser to not fail on queries with CASESPECIFIC + sql=query.replace("(NOT CASESPECIFIC)", ""), schema_resolver=self.schema_resolver, default_db=None, default_schema=default_database @@ -194,17 +842,17 @@ def gen_lineage_from_query( ) if result.debug_info.table_error: logger.debug( - f"Error parsing table lineage, {result.debug_info.table_error}" + f"Error parsing table lineage ({view_urn}):\n{result.debug_info.table_error}" ) self.report.num_table_parse_failures += 1 else: yield from self.builder.process_sql_parsing_result( result, query=query, - is_view_ddl=is_view_ddl, + is_view_ddl=view_urn is not None, query_timestamp=timestamp, user=f"urn:li:corpuser:{user}", - include_urns=self.schema_resolver.get_urns(), + include_urns=urns, ) def get_metadata_engine(self) -> Engine: @@ -214,10 +862,17 @@ def get_metadata_engine(self) -> Engine: def get_workunits_internal(self) -> Iterable[Union[MetadataWorkUnit, SqlWorkUnit]]: # Add all schemas to the schema resolver - yield from super().get_workunits_internal() + # Sql parser operates on lowercase urns so we need to lowercase the urns + for wu in auto_lowercase_urns(super().get_workunits_internal()): + urn = wu.get_urn() + schema_metadata = wu.get_aspect_of_type(SchemaMetadataClass) + if schema_metadata: + self.schema_resolver.add_schema_metadata(urn, schema_metadata) + yield wu + urns = self.schema_resolver.get_urns() if self.config.include_table_lineage or self.config.include_usage_statistics: self.report.report_ingestion_stage_start("audit log extraction") - yield from self.get_audit_log_mcps() + yield from self.get_audit_log_mcps(urns=urns) yield from self.builder.gen_workunits() diff --git a/metadata-ingestion/tests/integration/lookml/lkml_same_name_views_different_file_path_samples/data.model.lkml b/metadata-ingestion/tests/integration/lookml/lkml_same_name_views_different_file_path_samples/data.model.lkml new file mode 100644 index 00000000000000..183b16b2a3c1da --- /dev/null +++ b/metadata-ingestion/tests/integration/lookml/lkml_same_name_views_different_file_path_samples/data.model.lkml @@ -0,0 +1,7 @@ +connection: "my_connection" + +include: "path1/foo.view.lkml" + +explore: aliased_explore { + from: my_view +} \ No newline at end of file diff --git a/metadata-ingestion/tests/integration/lookml/lkml_same_name_views_different_file_path_samples/data2.model.lkml b/metadata-ingestion/tests/integration/lookml/lkml_same_name_views_different_file_path_samples/data2.model.lkml new file mode 100644 index 00000000000000..6a4a96e2630fa9 --- /dev/null +++ b/metadata-ingestion/tests/integration/lookml/lkml_same_name_views_different_file_path_samples/data2.model.lkml @@ -0,0 +1,6 @@ +connection: "my_connection" +include: "path2/foo.view.lkml" + +explore: duplicate_explore { + from: my_view +} \ No newline at end of file diff --git a/metadata-ingestion/tests/integration/lookml/lkml_same_name_views_different_file_path_samples/path1/foo.view.lkml b/metadata-ingestion/tests/integration/lookml/lkml_same_name_views_different_file_path_samples/path1/foo.view.lkml new file mode 100644 index 00000000000000..40a981ebc7eb0d --- /dev/null +++ b/metadata-ingestion/tests/integration/lookml/lkml_same_name_views_different_file_path_samples/path1/foo.view.lkml @@ -0,0 +1,47 @@ +view: my_view { + derived_table: { + sql: + SELECT + is_latest, + country, + city, + timestamp, + measurement + FROM + my_table ;; + } + + dimension: country { + type: string + description: "The country" + sql: ${TABLE}.country ;; + } + + dimension: city { + type: string + description: "City" + sql: ${TABLE}.city ;; + } + + dimension: is_latest { + type: yesno + description: "Is latest data" + sql: ${TABLE}.is_latest ;; + } + + dimension_group: timestamp { + group_label: "Timestamp" + type: time + description: "Timestamp of measurement" + sql: ${TABLE}.timestamp ;; + timeframes: [hour, date, week, day_of_week] + } + + measure: average_measurement { + group_label: "Measurement" + type: average + description: "My measurement" + sql: ${TABLE}.measurement ;; + } + +} diff --git a/metadata-ingestion/tests/integration/lookml/lkml_same_name_views_different_file_path_samples/path2/foo.view.lkml b/metadata-ingestion/tests/integration/lookml/lkml_same_name_views_different_file_path_samples/path2/foo.view.lkml new file mode 100644 index 00000000000000..8bd8138f973866 --- /dev/null +++ b/metadata-ingestion/tests/integration/lookml/lkml_same_name_views_different_file_path_samples/path2/foo.view.lkml @@ -0,0 +1,41 @@ +view: my_view { + derived_table: { + sql: + SELECT + is_latest, + country, + city, + timestamp, + measurement + FROM + my_table ;; + } + + dimension: city { + type: string + description: "City" + sql: ${TABLE}.city ;; + } + + dimension: is_latest { + type: yesno + description: "Is latest data" + sql: ${TABLE}.is_latest ;; + } + + dimension_group: timestamp { + group_label: "Timestamp" + type: time + description: "Timestamp of measurement" + sql: ${TABLE}.timestamp ;; + timeframes: [hour, date, week, day_of_week] + } + + measure: average_measurement { + group_label: "Measurement" + type: average + description: "My measurement" + sql: ${TABLE}.measurement ;; + } + +} diff --git a/metadata-ingestion/tests/integration/lookml/lookml_same_name_views_different_file_path.json b/metadata-ingestion/tests/integration/lookml/lookml_same_name_views_different_file_path.json new file mode 100644 index 00000000000000..c212cc33b66d4e --- /dev/null +++ b/metadata-ingestion/tests/integration/lookml/lookml_same_name_views_different_file_path.json @@ -0,0 +1,587 @@ +[ +{ + "proposedSnapshot": { + "com.linkedin.pegasus2avro.metadata.snapshot.DatasetSnapshot": { + "urn": "urn:li:dataset:(urn:li:dataPlatform:looker,lkml_samples.path1.foo.view.my_view,PROD)", + "aspects": [ + { + "com.linkedin.pegasus2avro.common.BrowsePaths": { + "paths": [ + "/prod/looker/lkml_samples/path1/foo.view.lkml/views" + ] + } + }, + { + "com.linkedin.pegasus2avro.common.Status": { + "removed": false + } + }, + { + "com.linkedin.pegasus2avro.dataset.UpstreamLineage": { + "upstreams": [ + { + "auditStamp": { + "time": 1586847600000, + "actor": "urn:li:corpuser:datahub" + }, + "dataset": "urn:li:dataset:(urn:li:dataPlatform:snowflake,warehouse.default_db.default_schema.my_table,DEV)", + "type": "VIEW" + } + ] + } + }, + { + "com.linkedin.pegasus2avro.schema.SchemaMetadata": { + "schemaName": "my_view", + "platform": "urn:li:dataPlatform:looker", + "version": 0, + "created": { + "time": 0, + "actor": "urn:li:corpuser:unknown" + }, + "lastModified": { + "time": 0, + "actor": "urn:li:corpuser:unknown" + }, + "hash": "", + "platformSchema": { + "com.linkedin.pegasus2avro.schema.OtherSchema": { + "rawSchema": "" + } + }, + "fields": [ + { + "fieldPath": "country", + "nullable": false, + "description": "The country", + "label": "", + "type": { + "type": { + "com.linkedin.pegasus2avro.schema.StringType": {} + } + }, + "nativeDataType": "string", + "recursive": false, + "globalTags": { + "tags": [ + { + "tag": "urn:li:tag:Dimension" + } + ] + }, + "isPartOfKey": false + }, + { + "fieldPath": "city", + "nullable": false, + "description": "City", + "label": "", + "type": { + "type": { + "com.linkedin.pegasus2avro.schema.StringType": {} + } + }, + "nativeDataType": "string", + "recursive": false, + "globalTags": { + "tags": [ + { + "tag": "urn:li:tag:Dimension" + } + ] + }, + "isPartOfKey": false + }, + { + "fieldPath": "is_latest", + "nullable": false, + "description": "Is latest data", + "label": "", + "type": { + "type": { + "com.linkedin.pegasus2avro.schema.BooleanType": {} + } + }, + "nativeDataType": "yesno", + "recursive": false, + "globalTags": { + "tags": [ + { + "tag": "urn:li:tag:Dimension" + } + ] + }, + "isPartOfKey": false + }, + { + "fieldPath": "timestamp", + "nullable": false, + "description": "Timestamp of measurement", + "label": "", + "type": { + "type": { + "com.linkedin.pegasus2avro.schema.TimeType": {} + } + }, + "nativeDataType": "time", + "recursive": false, + "globalTags": { + "tags": [ + { + "tag": "urn:li:tag:Dimension" + }, + { + "tag": "urn:li:tag:Temporal" + } + ] + }, + "isPartOfKey": false + }, + { + "fieldPath": "average_measurement", + "nullable": false, + "description": "My measurement", + "label": "", + "type": { + "type": { + "com.linkedin.pegasus2avro.schema.NumberType": {} + } + }, + "nativeDataType": "average", + "recursive": false, + "globalTags": { + "tags": [ + { + "tag": "urn:li:tag:Measure" + } + ] + }, + "isPartOfKey": false + } + ], + "primaryKeys": [] + } + }, + { + "com.linkedin.pegasus2avro.dataset.DatasetProperties": { + "customProperties": { + "looker.file.path": "path1/foo.view.lkml" + }, + "name": "my_view", + "tags": [] + } + } + ] + } + }, + "systemMetadata": { + "lastObserved": 1586847600000, + "runId": "lookml-test", + "lastRunId": "no-run-id-provided" + } +}, +{ + "entityType": "dataset", + "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:looker,lkml_samples.path1.foo.view.my_view,PROD)", + "changeType": "UPSERT", + "aspectName": "subTypes", + "aspect": { + "json": { + "typeNames": [ + "View" + ] + } + }, + "systemMetadata": { + "lastObserved": 1586847600000, + "runId": "lookml-test", + "lastRunId": "no-run-id-provided" + } +}, +{ + "entityType": "dataset", + "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:looker,lkml_samples.path1.foo.view.my_view,PROD)", + "changeType": "UPSERT", + "aspectName": "viewProperties", + "aspect": { + "json": { + "materialized": false, + "viewLogic": "SELECT\n is_latest,\n country,\n city,\n timestamp,\n measurement\n FROM\n my_table", + "viewLanguage": "sql" + } + }, + "systemMetadata": { + "lastObserved": 1586847600000, + "runId": "lookml-test", + "lastRunId": "no-run-id-provided" + } +}, +{ + "entityType": "dataset", + "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:looker,lkml_samples.path1.foo.view.my_view,PROD)", + "changeType": "UPSERT", + "aspectName": "browsePathsV2", + "aspect": { + "json": { + "path": [ + { + "id": "looker" + }, + { + "id": "lkml_samples" + }, + { + "id": "path1" + }, + { + "id": "foo.view.lkml" + }, + { + "id": "views" + } + ] + } + }, + "systemMetadata": { + "lastObserved": 1586847600000, + "runId": "lookml-test", + "lastRunId": "no-run-id-provided" + } +}, +{ + "proposedSnapshot": { + "com.linkedin.pegasus2avro.metadata.snapshot.DatasetSnapshot": { + "urn": "urn:li:dataset:(urn:li:dataPlatform:looker,lkml_samples.path2.foo.view.my_view,PROD)", + "aspects": [ + { + "com.linkedin.pegasus2avro.common.BrowsePaths": { + "paths": [ + "/prod/looker/lkml_samples/path2/foo.view.lkml/views" + ] + } + }, + { + "com.linkedin.pegasus2avro.common.Status": { + "removed": false + } + }, + { + "com.linkedin.pegasus2avro.dataset.UpstreamLineage": { + "upstreams": [ + { + "auditStamp": { + "time": 1586847600000, + "actor": "urn:li:corpuser:datahub" + }, + "dataset": "urn:li:dataset:(urn:li:dataPlatform:snowflake,warehouse.default_db.default_schema.my_table,DEV)", + "type": "VIEW" + } + ] + } + }, + { + "com.linkedin.pegasus2avro.schema.SchemaMetadata": { + "schemaName": "my_view", + "platform": "urn:li:dataPlatform:looker", + "version": 0, + "created": { + "time": 0, + "actor": "urn:li:corpuser:unknown" + }, + "lastModified": { + "time": 0, + "actor": "urn:li:corpuser:unknown" + }, + "hash": "", + "platformSchema": { + "com.linkedin.pegasus2avro.schema.OtherSchema": { + "rawSchema": "" + } + }, + "fields": [ + { + "fieldPath": "city", + "nullable": false, + "description": "City", + "label": "", + "type": { + "type": { + "com.linkedin.pegasus2avro.schema.StringType": {} + } + }, + "nativeDataType": "string", + "recursive": false, + "globalTags": { + "tags": [ + { + "tag": "urn:li:tag:Dimension" + } + ] + }, + "isPartOfKey": false + }, + { + "fieldPath": "is_latest", + "nullable": false, + "description": "Is latest data", + "label": "", + "type": { + "type": { + "com.linkedin.pegasus2avro.schema.BooleanType": {} + } + }, + "nativeDataType": "yesno", + "recursive": false, + "globalTags": { + "tags": [ + { + "tag": "urn:li:tag:Dimension" + } + ] + }, + "isPartOfKey": false + }, + { + "fieldPath": "timestamp", + "nullable": false, + "description": "Timestamp of measurement", + "label": "", + "type": { + "type": { + "com.linkedin.pegasus2avro.schema.TimeType": {} + } + }, + "nativeDataType": "time", + "recursive": false, + "globalTags": { + "tags": [ + { + "tag": "urn:li:tag:Dimension" + }, + { + "tag": "urn:li:tag:Temporal" + } + ] + }, + "isPartOfKey": false + }, + { + "fieldPath": "average_measurement", + "nullable": false, + "description": "My measurement", + "label": "", + "type": { + "type": { + "com.linkedin.pegasus2avro.schema.NumberType": {} + } + }, + "nativeDataType": "average", + "recursive": false, + "globalTags": { + "tags": [ + { + "tag": "urn:li:tag:Measure" + } + ] + }, + "isPartOfKey": false + } + ], + "primaryKeys": [] + } + }, + { + "com.linkedin.pegasus2avro.dataset.DatasetProperties": { + "customProperties": { + "looker.file.path": "path2/foo.view.lkml" + }, + "name": "my_view", + "tags": [] + } + } + ] + } + }, + "systemMetadata": { + "lastObserved": 1586847600000, + "runId": "lookml-test", + "lastRunId": "no-run-id-provided" + } +}, +{ + "entityType": "dataset", + "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:looker,lkml_samples.path2.foo.view.my_view,PROD)", + "changeType": "UPSERT", + "aspectName": "subTypes", + "aspect": { + "json": { + "typeNames": [ + "View" + ] + } + }, + "systemMetadata": { + "lastObserved": 1586847600000, + "runId": "lookml-test", + "lastRunId": "no-run-id-provided" + } +}, +{ + "entityType": "dataset", + "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:looker,lkml_samples.path2.foo.view.my_view,PROD)", + "changeType": "UPSERT", + "aspectName": "viewProperties", + "aspect": { + "json": { + "materialized": false, + "viewLogic": "SELECT\n is_latest,\n country,\n city,\n timestamp,\n measurement\n FROM\n my_table", + "viewLanguage": "sql" + } + }, + "systemMetadata": { + "lastObserved": 1586847600000, + "runId": "lookml-test", + "lastRunId": "no-run-id-provided" + } +}, +{ + "entityType": "dataset", + "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:looker,lkml_samples.path2.foo.view.my_view,PROD)", + "changeType": "UPSERT", + "aspectName": "browsePathsV2", + "aspect": { + "json": { + "path": [ + { + "id": "looker" + }, + { + "id": "lkml_samples" + }, + { + "id": "path2" + }, + { + "id": "foo.view.lkml" + }, + { + "id": "views" + } + ] + } + }, + "systemMetadata": { + "lastObserved": 1586847600000, + "runId": "lookml-test", + "lastRunId": "no-run-id-provided" + } +}, +{ + "proposedSnapshot": { + "com.linkedin.pegasus2avro.metadata.snapshot.TagSnapshot": { + "urn": "urn:li:tag:Dimension", + "aspects": [ + { + "com.linkedin.pegasus2avro.tag.TagProperties": { + "name": "Dimension", + "description": "A tag that is applied to all dimension fields." + } + } + ] + } + }, + "systemMetadata": { + "lastObserved": 1586847600000, + "runId": "lookml-test", + "lastRunId": "no-run-id-provided" + } +}, +{ + "proposedSnapshot": { + "com.linkedin.pegasus2avro.metadata.snapshot.TagSnapshot": { + "urn": "urn:li:tag:Temporal", + "aspects": [ + { + "com.linkedin.pegasus2avro.tag.TagProperties": { + "name": "Temporal", + "description": "A tag that is applied to all time-based (temporal) fields such as timestamps or durations." + } + } + ] + } + }, + "systemMetadata": { + "lastObserved": 1586847600000, + "runId": "lookml-test", + "lastRunId": "no-run-id-provided" + } +}, +{ + "proposedSnapshot": { + "com.linkedin.pegasus2avro.metadata.snapshot.TagSnapshot": { + "urn": "urn:li:tag:Measure", + "aspects": [ + { + "com.linkedin.pegasus2avro.tag.TagProperties": { + "name": "Measure", + "description": "A tag that is applied to all measures (metrics). Measures are typically the columns that you aggregate on" + } + } + ] + } + }, + "systemMetadata": { + "lastObserved": 1586847600000, + "runId": "lookml-test", + "lastRunId": "no-run-id-provided" + } +}, +{ + "entityType": "tag", + "entityUrn": "urn:li:tag:Dimension", + "changeType": "UPSERT", + "aspectName": "status", + "aspect": { + "json": { + "removed": false + } + }, + "systemMetadata": { + "lastObserved": 1586847600000, + "runId": "lookml-test", + "lastRunId": "no-run-id-provided" + } +}, +{ + "entityType": "tag", + "entityUrn": "urn:li:tag:Measure", + "changeType": "UPSERT", + "aspectName": "status", + "aspect": { + "json": { + "removed": false + } + }, + "systemMetadata": { + "lastObserved": 1586847600000, + "runId": "lookml-test", + "lastRunId": "no-run-id-provided" + } +}, +{ + "entityType": "tag", + "entityUrn": "urn:li:tag:Temporal", + "changeType": "UPSERT", + "aspectName": "status", + "aspect": { + "json": { + "removed": false + } + }, + "systemMetadata": { + "lastObserved": 1586847600000, + "runId": "lookml-test", + "lastRunId": "no-run-id-provided" + } +} +] \ No newline at end of file diff --git a/metadata-ingestion/tests/integration/lookml/test_lookml.py b/metadata-ingestion/tests/integration/lookml/test_lookml.py index b1853cfa2b3c0a..a71b5978631483 100644 --- a/metadata-ingestion/tests/integration/lookml/test_lookml.py +++ b/metadata-ingestion/tests/integration/lookml/test_lookml.py @@ -802,3 +802,53 @@ def test_lookml_base_folder(): pydantic.ValidationError, match=r"base_folder.+not provided.+deploy_key" ): LookMLSourceConfig.parse_obj({"api": fake_api}) + + +@freeze_time(FROZEN_TIME) +def test_same_name_views_different_file_path(pytestconfig, tmp_path, mock_time): + """Test for reachable views""" + test_resources_dir = pytestconfig.rootpath / "tests/integration/lookml" + mce_out = "lookml_same_name_views_different_file_path.json" + pipeline = Pipeline.create( + { + "run_id": "lookml-test", + "source": { + "type": "lookml", + "config": { + "base_folder": str( + test_resources_dir + / "lkml_same_name_views_different_file_path_samples" + ), + "connection_to_platform_map": { + "my_connection": { + "platform": "snowflake", + "platform_instance": "warehouse", + "platform_env": "dev", + "default_db": "default_db", + "default_schema": "default_schema", + }, + }, + "parse_table_names_from_sql": True, + "project_name": "lkml_samples", + "process_refinements": False, + "view_naming_pattern": "{project}.{file_path}.view.{name}", + "view_browse_pattern": "/{env}/{platform}/{project}/{file_path}/views", + }, + }, + "sink": { + "type": "file", + "config": { + "filename": f"{tmp_path}/{mce_out}", + }, + }, + } + ) + pipeline.run() + pipeline.pretty_print_summary() + pipeline.raise_from_status(raise_warnings=True) + + mce_helpers.check_golden_file( + pytestconfig, + output_path=tmp_path / mce_out, + golden_path=test_resources_dir / mce_out, + ) diff --git a/metadata-service/factories/src/main/java/com/linkedin/gms/factory/kafka/SimpleKafkaConsumerFactory.java b/metadata-service/factories/src/main/java/com/linkedin/gms/factory/kafka/SimpleKafkaConsumerFactory.java index e12cbec87fe451..14ffc01d75781f 100644 --- a/metadata-service/factories/src/main/java/com/linkedin/gms/factory/kafka/SimpleKafkaConsumerFactory.java +++ b/metadata-service/factories/src/main/java/com/linkedin/gms/factory/kafka/SimpleKafkaConsumerFactory.java @@ -43,7 +43,7 @@ protected KafkaListenerContainerFactory createInstance(@Qualifier("configurat consumerProps.setBootstrapServers(Arrays.asList(kafkaConfiguration.getBootstrapServers().split(","))); } // else we rely on KafkaProperties which defaults to localhost:9092 - Map customizedProperties = consumerProps.buildProperties(); + Map customizedProperties = properties.buildConsumerProperties(); customizedProperties.put(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG, kafkaConfiguration.getConsumer().getMaxPartitionFetchBytes()); diff --git a/metadata-service/factories/src/test/java/com/linkedin/gms/factory/kafka/SimpleKafkaConsumerFactoryTest.java b/metadata-service/factories/src/test/java/com/linkedin/gms/factory/kafka/SimpleKafkaConsumerFactoryTest.java new file mode 100644 index 00000000000000..408c7b67b25f02 --- /dev/null +++ b/metadata-service/factories/src/test/java/com/linkedin/gms/factory/kafka/SimpleKafkaConsumerFactoryTest.java @@ -0,0 +1,32 @@ +package com.linkedin.gms.factory.kafka; + +import com.linkedin.gms.factory.config.ConfigurationProvider; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.boot.context.properties.EnableConfigurationProperties; +import org.springframework.boot.test.context.SpringBootTest; +import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory; +import org.springframework.test.context.testng.AbstractTestNGSpringContextTests; +import org.testng.annotations.Test; + +import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertNotNull; + +@SpringBootTest( + properties = { + "spring.kafka.properties.security.protocol=SSL" + }, + classes = { + SimpleKafkaConsumerFactory.class, + ConfigurationProvider.class + }) +@EnableConfigurationProperties(ConfigurationProvider.class) +public class SimpleKafkaConsumerFactoryTest extends AbstractTestNGSpringContextTests { + @Autowired + ConcurrentKafkaListenerContainerFactory testFactory; + + @Test + void testInitialization() { + assertNotNull(testFactory); + assertEquals(testFactory.getConsumerFactory().getConfigurationProperties().get("security.protocol"), "SSL"); + } +}