diff --git a/metadata-ingestion/src/datahub/ingestion/source/powerbi/config.py b/metadata-ingestion/src/datahub/ingestion/source/powerbi/config.py index de3c1be441b41..7c8487727c9ee 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/powerbi/config.py +++ b/metadata-ingestion/src/datahub/ingestion/source/powerbi/config.py @@ -130,6 +130,8 @@ class Constant: APP_SUB_TYPE = "App" STATE = "state" ACTIVE = "Active" + SQL_PARSING_FAILURE = "SQL Parsing Failure" + M_QUERY_NULL = '"null"' @dataclass @@ -175,6 +177,11 @@ class SupportedDataPlatform(Enum): powerbi_data_platform_name="Databricks", datahub_data_platform_name="databricks" ) + DatabricksMultiCloud_SQL = DataPlatformPair( + powerbi_data_platform_name="DatabricksMultiCloud", + datahub_data_platform_name="databricks", + ) + @dataclass class PowerBiDashboardSourceReport(StaleEntityRemovalSourceReport): @@ -199,6 +206,8 @@ class PowerBiDashboardSourceReport(StaleEntityRemovalSourceReport): m_query_parse_unexpected_character_errors: int = 0 m_query_parse_unknown_errors: int = 0 m_query_resolver_errors: int = 0 + m_query_resolver_no_lineage: int = 0 + m_query_resolver_successes: int = 0 def report_dashboards_scanned(self, count: int = 1) -> None: self.dashboards_scanned += count @@ -495,6 +504,18 @@ class PowerBiDashboardSourceConfig( description="Whether to ingest workspace app. Requires DataHub server 0.14.2+.", ) + m_query_parse_timeout: int = pydantic.Field( + default=70, + description="Timeout for PowerBI M-query parsing in seconds. Table-level lineage is determined by analyzing the M-query expression. " + "Increase this value if you encounter the 'M-Query Parsing Timeout' message in the connector report.", + ) + + metadata_api_timeout: int = pydantic.Field( + default=30, + description="timeout in seconds for Metadata Rest Api.", + hidden_from_docs=True, + ) + @root_validator(skip_on_failure=True) def validate_extract_column_level_lineage(cls, values: Dict) -> Dict: flags = [ diff --git a/metadata-ingestion/src/datahub/ingestion/source/powerbi/m_query/native_sql_parser.py b/metadata-ingestion/src/datahub/ingestion/source/powerbi/m_query/native_sql_parser.py index 27efad6dc21ca..61b1164825257 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/powerbi/m_query/native_sql_parser.py +++ b/metadata-ingestion/src/datahub/ingestion/source/powerbi/m_query/native_sql_parser.py @@ -1,4 +1,5 @@ import logging +import re from typing import List, Optional import sqlparse @@ -9,14 +10,29 @@ create_lineage_sql_parsed_result, ) -SPECIAL_CHARACTERS = ["#(lf)", "(lf)", "#(tab)"] +# It is the PowerBI M-Query way to mentioned \n , \t +SPECIAL_CHARACTERS = { + "#(lf)": "\n", + "(lf)": "\n", + "#(tab)": "\t", +} + +ANSI_ESCAPE_CHARACTERS = r"\x1b\[[0-9;]*m" logger = logging.getLogger(__name__) def remove_special_characters(native_query: str) -> str: for char in SPECIAL_CHARACTERS: - native_query = native_query.replace(char, " ") + native_query = native_query.replace(char, SPECIAL_CHARACTERS[char]) + + ansi_escape_regx = re.compile(ANSI_ESCAPE_CHARACTERS) + + native_query = ansi_escape_regx.sub("", native_query) + + # Replace "" quotes by ". Sqlglot is not handling column name alias surrounded with two double quotes + + native_query = native_query.replace('""', '"') return native_query @@ -53,6 +69,15 @@ def get_tables(native_query: str) -> List[str]: return tables +def remove_drop_statement(query: str) -> str: + # Certain PowerBI M-Queries contain a combination of DROP and SELECT statements within SQL, causing SQLParser to fail on these queries. + # Therefore, these occurrences are being removed. + # Regular expression to match patterns like "DROP TABLE IF EXISTS #;" + pattern = r"DROP TABLE IF EXISTS #\w+;?" + + return re.sub(pattern, "", query) + + def parse_custom_sql( ctx: PipelineContext, query: str, @@ -65,12 +90,10 @@ def parse_custom_sql( logger.debug("Using sqlglot_lineage to parse custom sql") - sql_query = remove_special_characters(query) - - logger.debug(f"Processing native query = {sql_query}") + logger.debug(f"Processing native query using DataHub Sql Parser = {query}") return create_lineage_sql_parsed_result( - query=sql_query, + query=query, default_schema=schema, default_db=database, platform=platform, diff --git a/metadata-ingestion/src/datahub/ingestion/source/powerbi/m_query/parser.py b/metadata-ingestion/src/datahub/ingestion/source/powerbi/m_query/parser.py index a61ad8d289b9d..15524137c0a85 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/powerbi/m_query/parser.py +++ b/metadata-ingestion/src/datahub/ingestion/source/powerbi/m_query/parser.py @@ -37,15 +37,19 @@ def get_lark_parser() -> Lark: return Lark(grammar, start="let_expression", regex=True) -def _parse_expression(expression: str) -> Tree: +def _parse_expression(expression: str, parse_timeout: int = 60) -> Tree: lark_parser: Lark = get_lark_parser() # Replace U+00a0 NO-BREAK SPACE with a normal space. # Sometimes PowerBI returns expressions with this character and it breaks the parser. expression = expression.replace("\u00a0", " ") + # Parser resolves the variable=null value to variable='', and in the Tree we get empty string + # to distinguish between an empty and null set =null to ="null" + expression = expression.replace("=null", '="null"') + logger.debug(f"Parsing expression = {expression}") - with threading_timeout(_M_QUERY_PARSE_TIMEOUT): + with threading_timeout(parse_timeout): parse_tree: Tree = lark_parser.parse(expression) if TRACE_POWERBI_MQUERY_PARSER: @@ -74,30 +78,33 @@ def get_upstream_tables( ) try: - with reporter.m_query_parse_timer: - reporter.m_query_parse_attempts += 1 - parse_tree: Tree = _parse_expression(table.expression) - valid, message = validator.validate_parse_tree( - parse_tree, native_query_enabled=config.native_query_parsing + table.expression, native_query_enabled=config.native_query_parsing ) if valid is False: assert message is not None logger.debug(f"Validation failed: {message}") reporter.info( - title="Unsupported M-Query", - message="DataAccess function is not present in M-Query expression", + title="Non-Data Platform Expression", + message=message, context=f"table-full-name={table.full_name}, expression={table.expression}, message={message}", ) reporter.m_query_parse_validation_errors += 1 return [] + + with reporter.m_query_parse_timer: + reporter.m_query_parse_attempts += 1 + parse_tree: Tree = _parse_expression( + table.expression, parse_timeout=config.m_query_parse_timeout + ) + except KeyboardInterrupt: raise except TimeoutException: reporter.m_query_parse_timeouts += 1 reporter.warning( title="M-Query Parsing Timeout", - message=f"M-Query parsing timed out after {_M_QUERY_PARSE_TIMEOUT} seconds. Lineage for this table will not be extracted.", + message=f"M-Query parsing timed out after {config.m_query_parse_timeout} seconds. Lineage for this table will not be extracted.", context=f"table-full-name={table.full_name}, expression={table.expression}", ) return [] @@ -112,7 +119,7 @@ def get_upstream_tables( reporter.m_query_parse_unknown_errors += 1 reporter.warning( - title="Unable to extract lineage from M-Query expression", + title="Unable to parse M-Query expression", message=f"Got an '{error_type}' while parsing the expression. Lineage will be missing for this table.", context=f"table-full-name={table.full_name}, expression={table.expression}", exc=e, @@ -132,6 +139,10 @@ def get_upstream_tables( platform_instance_resolver=platform_instance_resolver, ) + if lineage: + reporter.m_query_resolver_successes += 1 + else: + reporter.m_query_resolver_no_lineage += 1 return lineage except BaseException as e: diff --git a/metadata-ingestion/src/datahub/ingestion/source/powerbi/m_query/resolver.py b/metadata-ingestion/src/datahub/ingestion/source/powerbi/m_query/resolver.py index cc51fcee14104..9eafde2f75ecd 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/powerbi/m_query/resolver.py +++ b/metadata-ingestion/src/datahub/ingestion/source/powerbi/m_query/resolver.py @@ -9,6 +9,7 @@ import datahub.emitter.mce_builder as builder from datahub.ingestion.api.common import PipelineContext from datahub.ingestion.source.powerbi.config import ( + Constant, DataBricksPlatformDetail, DataPlatformPair, PlatformDetail, @@ -117,18 +118,24 @@ class AbstractDataPlatformTableCreator(ABC): """ ctx: PipelineContext + table: Table config: PowerBiDashboardSourceConfig + reporter: PowerBiDashboardSourceReport platform_instance_resolver: AbstractDataPlatformInstanceResolver def __init__( self, ctx: PipelineContext, + table: Table, config: PowerBiDashboardSourceConfig, + reporter: PowerBiDashboardSourceReport, platform_instance_resolver: AbstractDataPlatformInstanceResolver, ) -> None: super().__init__() self.ctx = ctx + self.table = table self.config = config + self.reporter = reporter self.platform_instance_resolver = platform_instance_resolver @abstractmethod @@ -214,6 +221,10 @@ def parse_custom_sql( ) ) + query = native_sql_parser.remove_drop_statement( + native_sql_parser.remove_special_characters(query) + ) + parsed_result: Optional[ "SqlParsingResult" ] = native_sql_parser.parse_custom_sql( @@ -227,7 +238,19 @@ def parse_custom_sql( ) if parsed_result is None: - logger.debug("Failed to parse query") + self.reporter.info( + title=Constant.SQL_PARSING_FAILURE, + message="Fail to parse native sql present in PowerBI M-Query", + context=f"table-name={self.table.full_name}, sql={query}", + ) + return Lineage.empty() + + if parsed_result.debug_info and parsed_result.debug_info.table_error: + self.reporter.warning( + title=Constant.SQL_PARSING_FAILURE, + message="Fail to parse native sql present in PowerBI M-Query", + context=f"table-name={self.table.full_name}, error={parsed_result.debug_info.table_error},sql={query}", + ) return Lineage.empty() for urn in parsed_result.in_tables: @@ -290,8 +313,8 @@ class MQueryResolver(AbstractDataAccessMQueryResolver, ABC): Once DataAccessFunctionDetail instance is initialized thereafter MQueryResolver generates the DataPlatformTable with the help of AbstractDataPlatformTableCreator (see method resolve_to_data_platform_table_list). - Classes which extended from AbstractDataPlatformTableCreator knows how to convert generated DataAccessFunctionDetail instance - to respective DataPlatformTable instance as per dataplatform. + Classes which extended from AbstractDataPlatformTableCreator know how to convert generated DataAccessFunctionDetail instance + to the respective DataPlatformTable instance as per dataplatform. """ @@ -343,6 +366,22 @@ def get_argument_list(invoke_expression: Tree) -> Optional[Tree]: return argument_list + def take_first_argument(self, expression: Tree) -> Optional[Tree]: + + # function is not data-access function, lets process function argument + first_arg_tree: Optional[Tree] = tree_function.first_arg_list_func(expression) + + if first_arg_tree is None: + logger.debug( + f"Function invocation without argument in expression = {expression.pretty()}" + ) + self.reporter.report_warning( + f"{self.table.full_name}-variable-statement", + "Function invocation without argument", + ) + return None + return first_arg_tree + def _process_invoke_expression( self, invoke_expression: Tree ) -> Union[DataAccessFunctionDetail, List[str], None]: @@ -350,6 +389,9 @@ def _process_invoke_expression( data_access_func: str = tree_function.make_function_name(letter_tree) # The invoke function is either DataAccess function like PostgreSQL.Database() or # some other function like Table.AddColumn or Table.Combine and so on + + logger.debug(f"function-name: {data_access_func}") + if data_access_func in self.data_access_functions: arg_list: Optional[Tree] = MQueryResolver.get_argument_list( invoke_expression @@ -368,20 +410,8 @@ def _process_invoke_expression( identifier_accessor=None, ) - # function is not data-access function, lets process function argument - first_arg_tree: Optional[Tree] = tree_function.first_arg_list_func( - invoke_expression - ) - + first_arg_tree: Optional[Tree] = self.take_first_argument(invoke_expression) if first_arg_tree is None: - logger.debug( - f"Function invocation without argument in expression = {invoke_expression.pretty()}" - ) - self.reporter.report_warning( - title="M-Query Resolver Error", - message="Unable to extract lineage from parsed M-Query expression (function invocation without argument)", - context=f"{self.table.full_name}: function invocation without argument", - ) return None flat_arg_list: List[Tree] = tree_function.flat_argument_list(first_arg_tree) @@ -390,6 +420,40 @@ def _process_invoke_expression( return None first_argument: Tree = flat_arg_list[0] # take first argument only + + # Detect nested function calls in the first argument + # M-Query's data transformation pipeline: + # 1. Functions typically operate on tables/columns + # 2. First argument must be either: + # - A table variable name (referencing data source) + # - Another function that eventually leads to a table + # + # Example of nested functions: + # #"Split Column by Delimiter2" = Table.SplitColumn( + # Table.TransformColumnTypes(#"Removed Columns1", "KB") + # ) + # + # In this example: + # - The inner function Table.TransformColumnTypes takes #"Removed Columns1" + # (a table reference) as its first argument + # - Its result is then passed as the first argument to Table.SplitColumn + second_invoke_expression: Optional[ + Tree + ] = tree_function.first_invoke_expression_func(first_argument) + if second_invoke_expression: + # 1. The First argument is function call + # 2. That function's first argument references next table variable + first_arg_tree = self.take_first_argument(second_invoke_expression) + if first_arg_tree is None: + return None + + flat_arg_list = tree_function.flat_argument_list(first_arg_tree) + if len(flat_arg_list) == 0: + logger.debug("flat_arg_list is zero") + return None + + first_argument = flat_arg_list[0] # take first argument only + expression: Optional[Tree] = tree_function.first_list_expression_func( first_argument ) @@ -478,7 +542,7 @@ def internal( self.reporter.report_warning( title="Unable to extract lineage from M-Query expression", message="Lineage will be incomplete.", - context=f"table-full-name={self.table.full_name}: output-variable={current_identifier} not found in table expression", + context=f"table-full-name={self.table.full_name}, expression = {self.table.expression}, output-variable={current_identifier} not found in table expression", ) return None @@ -579,7 +643,9 @@ def resolve_to_data_platform_table_list( AbstractDataPlatformTableCreator ) = supported_resolver.get_table_full_name_creator()( ctx=ctx, + table=self.table, config=config, + reporter=self.reporter, platform_instance_resolver=platform_instance_resolver, ) @@ -680,8 +746,10 @@ def create_urn_using_old_parser( database = db_name schema = MSSqlDataPlatformTableCreator.DEFAULT_SCHEMA else: - logger.warning( - f"Unsupported table format found {parsed_table} in query {query}" + self.reporter.warning( + title="Invalid table format", + message="The advanced SQL lineage feature (enable_advance_lineage_sql_construct) is disabled. Please either enable this feature or ensure the table is referenced as .. in the SQL.", + context=f"table-name={self.table.full_name}", ) continue @@ -715,7 +783,7 @@ def create_lineage( ) if len(arguments) == 2: - # It is regular case of MS-SQL + # It is a regular case of MS-SQL logger.debug("Handling with regular case") return self.two_level_access_pattern(data_access_func_detail) @@ -1032,6 +1100,7 @@ class NativeQueryDataPlatformTableCreator(AbstractDataPlatformTableCreator): SUPPORTED_NATIVE_QUERY_DATA_PLATFORM: dict = { SupportedDataPlatform.SNOWFLAKE.value.powerbi_data_platform_name: SupportedDataPlatform.SNOWFLAKE, SupportedDataPlatform.AMAZON_REDSHIFT.value.powerbi_data_platform_name: SupportedDataPlatform.AMAZON_REDSHIFT, + SupportedDataPlatform.DatabricksMultiCloud_SQL.value.powerbi_data_platform_name: SupportedDataPlatform.DatabricksMultiCloud_SQL, } current_data_platform: SupportedDataPlatform = SupportedDataPlatform.SNOWFLAKE @@ -1079,6 +1148,34 @@ def create_urn_using_old_parser(self, query: str, server: str) -> Lineage: column_lineage=[], ) + def get_db_name(self, data_access_tokens: List[str]) -> Optional[str]: + if ( + data_access_tokens[0] + != SupportedDataPlatform.DatabricksMultiCloud_SQL.value.powerbi_data_platform_name + ): + return None + try: + if "Database" in data_access_tokens: + index = data_access_tokens.index("Database") + if data_access_tokens[index + 1] != Constant.M_QUERY_NULL: + # Database name is explicitly set in argument + return data_access_tokens[index + 1] + + if "Name" in data_access_tokens: + index = data_access_tokens.index("Name") + # Next element is value of the Name. It is a database name + return data_access_tokens[index + 1] + + if "Catalog" in data_access_tokens: + index = data_access_tokens.index("Catalog") + # Next element is value of the Catalog. In Databricks Catalog can also be used in place of a database. + return data_access_tokens[index + 1] + + except IndexError as e: + logger.debug("Database name is not available", exc_info=e) + + return None + def create_lineage( self, data_access_func_detail: DataAccessFunctionDetail ) -> Lineage: @@ -1093,6 +1190,7 @@ def create_lineage( ) logger.debug(f"Flat argument list = {flat_argument_list}") return Lineage.empty() + data_access_tokens: List[str] = tree_function.remove_whitespaces_from_list( tree_function.token_values(flat_argument_list[0]) ) @@ -1105,6 +1203,8 @@ def create_lineage( f"NativeQuery is supported only for {self.SUPPORTED_NATIVE_QUERY_DATA_PLATFORM}" ) + return Lineage.empty() + if len(data_access_tokens[0]) < 3: logger.debug( f"Server is not available in argument list for data-platform {data_access_tokens[0]}. Returning empty " @@ -1115,8 +1215,7 @@ def create_lineage( self.current_data_platform = self.SUPPORTED_NATIVE_QUERY_DATA_PLATFORM[ data_access_tokens[0] ] - - # First argument is the query + # The First argument is the query sql_query: str = tree_function.strip_char_from_list( values=tree_function.remove_whitespaces_from_list( tree_function.token_values(flat_argument_list[1]) @@ -1134,10 +1233,12 @@ def create_lineage( server=server, ) + database_name: Optional[str] = self.get_db_name(data_access_tokens) + return self.parse_custom_sql( query=sql_query, server=server, - database=None, # database and schema is available inside custom sql as per PowerBI Behavior + database=database_name, schema=None, ) diff --git a/metadata-ingestion/src/datahub/ingestion/source/powerbi/m_query/tree_function.py b/metadata-ingestion/src/datahub/ingestion/source/powerbi/m_query/tree_function.py index dfcbcf1e6f462..186f03fe13639 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/powerbi/m_query/tree_function.py +++ b/metadata-ingestion/src/datahub/ingestion/source/powerbi/m_query/tree_function.py @@ -46,8 +46,8 @@ def get_variable_statement(parse_tree: Tree, variable: str) -> Optional[Tree]: def get_first_rule(tree: Tree, rule: str) -> Optional[Tree]: """ - Lark library doesn't have advance search function. - This function will return the first tree of provided rule + Lark library doesn't have an advance search function. + This function will return the first tree of the provided rule :param tree: Tree to search for the expression rule :return: Tree """ @@ -99,7 +99,6 @@ def internal(node: Union[Tree, Token]) -> None: logger.debug(f"Unable to resolve parameter reference to {ref}") values.append(ref) elif isinstance(node, Token): - # This means we're probably looking at a literal. values.append(cast(Token, node).value) return else: @@ -120,10 +119,14 @@ def remove_whitespaces_from_list(values: List[str]) -> List[str]: return result +def strip_char(value: str, char: str = '"') -> str: + return value.strip(char) + + def strip_char_from_list(values: List[str], char: str = '"') -> List[str]: result: List[str] = [] for item in values: - result.append(item.strip(char)) + result.append(strip_char(item.strip(char), char=char)) return result diff --git a/metadata-ingestion/src/datahub/ingestion/source/powerbi/m_query/validator.py b/metadata-ingestion/src/datahub/ingestion/source/powerbi/m_query/validator.py index 9effdf5f97e30..ca2abf97c9f30 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/powerbi/m_query/validator.py +++ b/metadata-ingestion/src/datahub/ingestion/source/powerbi/m_query/validator.py @@ -1,28 +1,28 @@ import logging -from typing import List, Optional, Tuple +from typing import Optional, Tuple -from lark import Tree - -from datahub.ingestion.source.powerbi.m_query import resolver, tree_function +from datahub.ingestion.source.powerbi.m_query import resolver logger = logging.getLogger(__name__) def validate_parse_tree( - tree: Tree, native_query_enabled: bool = True + expression: str, native_query_enabled: bool = True ) -> Tuple[bool, Optional[str]]: """ - :param tree: tree to validate as per functions supported by m_parser module + :param expression: M-Query expression to check if supported data-function is present in expression :param native_query_enabled: Whether user want to extract lineage from native query - :return: first argument is False if validation is failed and second argument would contain the error message. - in the case of valid tree, the first argument is True and the second argument would be None. + :return: True or False. """ - functions: List[str] = tree_function.get_all_function_name(tree) - if len(functions) == 0: - return False, "Function calls not found" + function_names = [fun.value for fun in resolver.FunctionName] + if not any(fun in expression for fun in function_names): + return False, "DataAccess function is not present in M-Query expression." if native_query_enabled is False: - if resolver.FunctionName.NATIVE_QUERY.value in functions: - return False, "Lineage extraction from native query is disabled." + if resolver.FunctionName.NATIVE_QUERY.value in function_names: + return ( + False, + "Lineage extraction from native query is disabled. Enable native_query_parsing in recipe", + ) return True, None diff --git a/metadata-ingestion/src/datahub/ingestion/source/powerbi/powerbi-lexical-grammar.rule b/metadata-ingestion/src/datahub/ingestion/source/powerbi/powerbi-lexical-grammar.rule index 29a5391e75be8..51a0dff288558 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/powerbi/powerbi-lexical-grammar.rule +++ b/metadata-ingestion/src/datahub/ingestion/source/powerbi/powerbi-lexical-grammar.rule @@ -7,8 +7,20 @@ // whole relational_expression parse tree. // - Added letter_character_and_decimal_digit phrase and updated keyword_or_identifier phrase // - Added below pattern in argument_list -// | WS_INLINE? SQL_STRING -// | WS_INLINE? SQL_STRING "," argument_list +// | WS_INLINE? sql_string +// | WS_INLINE? sql_string "," argument_list +// - Added subtract_expression +// - Updated relational_expression, here below are the updates +// | subtract_expression +// | subtract_expression "<" relational_expression +// | subtract_expression ">" relational_expression +// | subtract_expression "<=" relational_expression +// | subtract_expression ">=" relational_expression +// - Added empty_string +// - Updated argument_list, below are the updates +// | empty_string +// | empty_string "," argument_list +// - Added sql_string in any_literal lexical_unit: lexical_elements? @@ -290,10 +302,16 @@ equality_expression: WS_INLINE? relational_expression | relational_expression WS_INLINE? "<>" WS_INLINE? equality_expression relational_expression: additive_expression + | subtract_expression | additive_expression "<" relational_expression | additive_expression ">" relational_expression | additive_expression "<=" relational_expression | additive_expression ">=" relational_expression + | subtract_expression "<" relational_expression + | subtract_expression ">" relational_expression + | subtract_expression "<=" relational_expression + | subtract_expression ">=" relational_expression + additive_expression: multiplicative_expression | multiplicative_expression "+" additive_expression @@ -301,6 +319,11 @@ additive_expression: multiplicative_expression | multiplicative_expression WS_INLINE? NEWLINE? WS_INLINE? "&" WS_INLINE? NEWLINE? WS_INLINE? additive_expression +subtract_expression: multiplicative_expression + | multiplicative_expression "-" additive_expression + | multiplicative_expression WS_INLINE? "_" WS_INLINE? additive_expression + | multiplicative_expression WS_INLINE? NEWLINE? WS_INLINE? "&" WS_INLINE? NEWLINE? WS_INLINE? additive_expression + multiplicative_expression: WS_INLINE? metadata_expression | metadata_expression WS_INLINE? "*" WS_INLINE? multiplicative_expression | metadata_expression "/" multiplicative_expression @@ -346,14 +369,23 @@ not_implemented_expression: "..." invoke_expression: "#"? primary_expression "(" NEWLINE? argument_list? NEWLINE? ")" -SQL_STRING: /\"((?:[^\"\\]|\\[\"]|\"\"|\#\(lf\))+)\"/ +empty_string: /"([^"]|\\")*"/ + +// SQL String specific rules +sql_content: /(?:[^\"\\]|\\[\"]|\"\"|\#\(lf\))+/ + +sql_string: "\"" sql_content "\"" argument_list: WS_INLINE? expression | WS_INLINE? expression WS_INLINE? "," WS_INLINE? argument_list - | WS_INLINE? SQL_STRING - | WS_INLINE? SQL_STRING "," argument_list + | WS_INLINE? sql_string + | WS_INLINE? sql_string "," argument_list | "\"" identifier "\"" | "\"" identifier "\"" "," argument_list + | "[" identifier "]" + | "[" identifier "]" "," argument_list + | empty_string + | empty_string "," argument_list | WS_INLINE | WS_INLINE? ESCAPED_STRING | WS_INLINE? ESCAPED_STRING "," argument_list @@ -576,6 +608,7 @@ any_literal: record_literal | number_literal | text_literal | null_literal + | sql_string %import common.WORD diff --git a/metadata-ingestion/src/datahub/ingestion/source/powerbi/powerbi.py b/metadata-ingestion/src/datahub/ingestion/source/powerbi/powerbi.py index b2afdc3e40931..cef2d098aebc4 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/powerbi/powerbi.py +++ b/metadata-ingestion/src/datahub/ingestion/source/powerbi/powerbi.py @@ -1512,7 +1512,7 @@ def get_workunits_internal(self) -> Iterable[MetadataWorkUnit]: if self.source_config.modified_since: # As modified_workspaces is not idempotent, hence we checkpoint for each powerbi workspace - # Because job_id is used as dictionary key, we have to set a new job_id + # Because job_id is used as a dictionary key, we have to set a new job_id # Refer to https://github.com/datahub-project/datahub/blob/master/metadata-ingestion/src/datahub/ingestion/source/state/stateful_ingestion_base.py#L390 self.stale_entity_removal_handler.set_job_id(workspace.id) self.state_provider.register_stateful_ingestion_usecase_handler( diff --git a/metadata-ingestion/src/datahub/ingestion/source/powerbi/rest_api_wrapper/data_resolver.py b/metadata-ingestion/src/datahub/ingestion/source/powerbi/rest_api_wrapper/data_resolver.py index 7a47c40976bec..a59d58519d6bf 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/powerbi/rest_api_wrapper/data_resolver.py +++ b/metadata-ingestion/src/datahub/ingestion/source/powerbi/rest_api_wrapper/data_resolver.py @@ -56,6 +56,19 @@ def is_http_failure(response: Response, message: str) -> bool: return True +class SessionWithTimeout(requests.Session): + timeout: int + + def __init__(self, timeout, *args, **kwargs): + super().__init__(*args, **kwargs) + self.timeout = timeout + + def request(self, method, url, **kwargs): + # Set the default timeout if none is provided + kwargs.setdefault("timeout", self.timeout) + return super().request(method, url, **kwargs) + + class DataResolverBase(ABC): SCOPE: str = "https://analysis.windows.net/powerbi/api/.default" MY_ORG_URL = "https://api.powerbi.com/v1.0/myorg" @@ -69,6 +82,7 @@ def __init__( client_id: str, client_secret: str, tenant_id: str, + metadata_api_timeout: int, ): self.__access_token: Optional[str] = None self.__access_token_expiry_time: Optional[datetime] = None @@ -84,7 +98,9 @@ def __init__( self.get_access_token() logger.info(f"Connected to {self._get_authority_url()}") - self._request_session = requests.Session() + + self._request_session = SessionWithTimeout(timeout=metadata_api_timeout) + # set re-try parameter for request_session self._request_session.mount( "https://", diff --git a/metadata-ingestion/src/datahub/ingestion/source/powerbi/rest_api_wrapper/powerbi_api.py b/metadata-ingestion/src/datahub/ingestion/source/powerbi/rest_api_wrapper/powerbi_api.py index e137f175c15ad..b49f1f09fa966 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/powerbi/rest_api_wrapper/powerbi_api.py +++ b/metadata-ingestion/src/datahub/ingestion/source/powerbi/rest_api_wrapper/powerbi_api.py @@ -69,12 +69,14 @@ def __init__( client_id=self.__config.client_id, client_secret=self.__config.client_secret, tenant_id=self.__config.tenant_id, + metadata_api_timeout=self.__config.metadata_api_timeout, ) self.__admin_api_resolver = AdminAPIResolver( client_id=self.__config.client_id, client_secret=self.__config.client_secret, tenant_id=self.__config.tenant_id, + metadata_api_timeout=self.__config.metadata_api_timeout, ) self.reporter: PowerBiDashboardSourceReport = reporter @@ -91,6 +93,14 @@ def log_http_error(self, message: str) -> Any: if isinstance(e, requests.exceptions.HTTPError): logger.warning(f"HTTP status-code = {e.response.status_code}") + if isinstance(e, requests.exceptions.Timeout): + url: str = e.request.url if e.request else "URL not available" + self.reporter.warning( + title="Metadata API Timeout", + message=f"Metadata endpoints are not reachable. Check network connectivity to PowerBI Service.", + context=f"url={url}", + ) + logger.debug(msg=message, exc_info=e) return e @@ -253,7 +263,7 @@ def get_workspaces(self) -> List[Workspace]: except: self.log_http_error(message="Unable to fetch list of workspaces") - raise # we want this exception to bubble up + # raise # we want this exception to bubble up workspaces = [ Workspace( diff --git a/metadata-ingestion/src/datahub/utilities/partition_executor.py b/metadata-ingestion/src/datahub/utilities/partition_executor.py index 237ffc6dc611b..aeabc5c55e6b2 100644 --- a/metadata-ingestion/src/datahub/utilities/partition_executor.py +++ b/metadata-ingestion/src/datahub/utilities/partition_executor.py @@ -53,7 +53,10 @@ def __init__(self, max_workers: int, max_pending: int) -> None: self.max_workers = max_workers self.max_pending = max_pending - self._executor = ThreadPoolExecutor(max_workers=max_workers) + self._executor = ThreadPoolExecutor( + max_workers=max_workers, + thread_name_prefix=self.__class__.__name__, + ) # Each pending or executing request will acquire a permit from this semaphore. self._semaphore = BoundedSemaphore(max_pending + max_workers) @@ -261,11 +264,17 @@ def __init__( self.min_process_interval = min_process_interval assert self.max_workers > 1 - # We add one here to account for the clearinghouse worker thread. - self._executor = ThreadPoolExecutor(max_workers=max_workers + 1) + self._executor = ThreadPoolExecutor( + # We add one here to account for the clearinghouse worker thread. + max_workers=max_workers + 1, + thread_name_prefix=self.__class__.__name__, + ) self._clearinghouse_started = False + # pending_count includes the length of the pending list, plus the + # number of items sitting in the clearinghouse's internal queue. self._pending_count = BoundedSemaphore(max_pending) + self._pending: "queue.Queue[Optional[_BatchPartitionWorkItem]]" = queue.Queue( maxsize=max_pending ) @@ -294,10 +303,10 @@ def _clearinghouse_worker(self) -> None: # noqa: C901 def _handle_batch_completion( batch: List[_BatchPartitionWorkItem], future: Future ) -> None: - nonlocal workers_available - workers_available += 1 - with clearinghouse_state_lock: + nonlocal workers_available + workers_available += 1 + for item in batch: keys_no_longer_in_flight.add(item.key) self._pending_count.release() diff --git a/metadata-ingestion/tests/integration/powerbi/test_m_parser.py b/metadata-ingestion/tests/integration/powerbi/test_m_parser.py index 17032a7359fef..f4613c524316e 100644 --- a/metadata-ingestion/tests/integration/powerbi/test_m_parser.py +++ b/metadata-ingestion/tests/integration/powerbi/test_m_parser.py @@ -1,12 +1,15 @@ import logging import sys +import time from typing import List, Tuple +from unittest.mock import MagicMock, patch import pytest from lark import Tree import datahub.ingestion.source.powerbi.rest_api_wrapper.data_classes as powerbi_data_classes from datahub.ingestion.api.common import PipelineContext +from datahub.ingestion.api.source import StructuredLogLevel from datahub.ingestion.source.powerbi.config import ( PowerBiDashboardSourceConfig, PowerBiDashboardSourceReport, @@ -51,10 +54,38 @@ 'let\n Source = DatabricksMultiCloud.Catalogs("abc.cloud.databricks.com", "/sql/gh2cfe3fe1d4c7cd", [Catalog="data_analysis", Database="summary", EnableAutomaticProxyDiscovery=null]),\n vips_data_summary_dev = Source{[Item="vips_data",Schema="summary",Catalog="data_analysis"]}[Data],\n #"Changed Type" = Table.TransformColumnTypes(vips_data_summary_dev,{{"vipstartDate", type date}, {"enteredDate", type datetime}, {"estDraftDate", type datetime}, {"estPublDate", type datetime}})\nin\n #"Changed Type"', 'let\n Source = Value.NativeQuery(Snowflake.Databases("0DD93C6BD5A6.snowflakecomputing.com","sales_analytics_warehouse_prod",[Role="sales_analytics_member_ad"]){[Name="ORDERING"]}[Data], "SELECT#(lf) DISTINCT#(lf) T5.PRESENTMENT_START_DATE#(lf),T5.PRESENTMENT_END_DATE#(lf),T5.DISPLAY_NAME#(lf),T5.NAME#(tab)#(lf),T5.PROMO_DISPLAY_NAME#(lf),T5.REGION#(lf),T5.ID#(lf),T5.WALKOUT#(lf),T6.DEAL_ID#(lf),T6.TYPE#(lf),T5.FREE_PERIOD#(lf),T6.PRICE_MODIFICATION#(lf)#(lf)FROM#(lf)#(lf)(#(lf) SELECT #(lf) T1.NAME#(lf),DATE(T1.CREATED_AT) as CREATED_AT#(lf),T1.PROMO_CODE#(lf),T1.STATUS#(lf),DATE(T1.UPDATED_AT) as UPDATED_AT#(lf),T1.ID#(lf),T1.DISPLAY_NAME as PROMO_DISPLAY_NAME#(lf),T4.*#(lf)FROM#(lf)(SELECT#(lf) DISTINCT#(lf) NAME#(lf),CREATED_AT#(lf),PROMO_CODE#(lf),STATUS#(lf),UPDATED_AT#(lf),ID#(lf),DISPLAY_NAME#(lf) FROM RAW.PROMOTIONS#(lf)#(lf)) T1#(lf)INNER JOIN#(lf)#(lf) (#(lf) SELECT #(lf) T3.PRODUCT_STATUS#(lf),T3.CODE#(lf),T3.REGION#(lf),T3.DISPLAY_ORDER_SEQUENCE#(lf),T3.PRODUCT_LINE_ID#(lf),T3.DISPLAY_NAME#(lf),T3.PRODUCT_TYPE#(lf),T3.ID as PROD_TBL_ID#(lf),T3.NAME as PROD_TBL_NAME#(lf),DATE(T2.PRESENTMENT_END_DATE) as PRESENTMENT_END_DATE#(lf),T2.PRICE_COMMITMENT_PERIOD#(lf),T2.NAME as SEAL_TBL_NAME#(lf),DATE(T2.CREATED_AT) as SEAL_TBL_CREATED_AT#(lf),T2.DESCRIPTION#(lf),T2.FREE_PERIOD#(lf),T2.WALKOUT#(lf),T2.PRODUCT_CAT_ID#(lf),T2.PROMOTION_ID#(lf),DATE(T2.PRESENTMENT_START_DATE) as PRESENTMENT_START_DATE#(lf),YEAR(T2.PRESENTMENT_START_DATE) as DEAL_YEAR_START#(lf),MONTH(T2.PRESENTMENT_START_DATE) as DEAL_MONTH_START#(lf),T2.DEAL_TYPE#(lf),DATE(T2.UPDATED_AT) as SEAL_TBL_UPDATED_AT#(lf),T2.ID as SEAL_TBL_ID#(lf),T2.STATUS as SEAL_TBL_STATUS#(lf)FROM#(lf)(SELECT#(lf) DISTINCT#(lf) PRODUCT_STATUS#(lf),CODE#(lf),REGION#(lf),DISPLAY_ORDER_SEQUENCE#(lf),PRODUCT_LINE_ID#(lf),DISPLAY_NAME#(lf),PRODUCT_TYPE#(lf),ID #(lf),NAME #(lf) FROM#(lf) RAW.PRODUCTS#(lf)#(lf)) T3#(lf)INNER JOIN#(lf)(#(lf) SELECT#(lf) DISTINCT#(lf) PRESENTMENT_END_DATE#(lf),PRICE_COMMITMENT_PERIOD#(lf),NAME#(lf),CREATED_AT#(lf),DESCRIPTION#(lf),FREE_PERIOD#(lf),WALKOUT#(lf),PRODUCT_CAT_ID#(lf),PROMOTION_ID#(lf),PRESENTMENT_START_DATE#(lf),DEAL_TYPE#(lf),UPDATED_AT#(lf),ID#(lf),STATUS#(lf) FROM#(lf) RAW.DEALS#(lf)#(lf)) T2#(lf)ON#(lf)T3.ID = T2.PRODUCT_CAT_ID #(lf)WHERE#(lf)T2.PRESENTMENT_START_DATE >= \'2015-01-01\'#(lf)AND#(lf)T2.STATUS = \'active\'#(lf)#(lf))T4#(lf)ON#(lf)T1.ID = T4.PROMOTION_ID#(lf))T5#(lf)INNER JOIN#(lf)RAW.PRICE_MODIFICATIONS T6#(lf)ON#(lf)T5.SEAL_TBL_ID = T6.DEAL_ID", null, [EnableFolding=true]) \n in \n Source', 'let\n Source = Databricks.Catalogs(#"hostname",#"http_path", null),\n edp_prod_Database = Source{[Name=#"catalog",Kind="Database"]}[Data],\n gold_Schema = edp_prod_Database{[Name=#"schema",Kind="Schema"]}[Data],\n pet_view = gold_Schema{[Name="pet_list",Kind="View"]}[Data],\n #"Filtered Rows" = Table.SelectRows(pet_view, each true),\n #"Removed Columns" = Table.RemoveColumns(#"Filtered Rows",{"created_timestmp"})\nin\n #"Removed Columns"', - 'let\n Source = Value.NativeQuery(Snowflake.Databases("0DD93C6BD5A6.snowflakecomputing.com","sales_analytics_warehouse_prod",[Role="sales_analytics_member_ad"]){[Name="SL_OPERATIONS"]}[Data], "select SALE_NO AS ""SaleNo""#(lf) ,CODE AS ""Code""#(lf) ,ENDDATE AS ""end_date""#(lf) from SL_OPERATIONS.SALE.REPORTS#(lf) where ENDDATE > \'2024-02-03\'", null, [EnableFolding=true]),\n #"selected Row" = Table.SelectRows(Source)\nin\n #"selected Row"', + 'let\n Source = Value.NativeQuery(Snowflake.Databases("0DD93C6BD5A6.snowflakecomputing.com","sales_analytics_warehouse_prod",[Role="sales_analytics_member_ad"]){[Name="SL_OPERATIONS"]}[Data], "select SALE_NO AS ""\x1b[4mSaleNo\x1b[0m""#(lf) ,CODE AS ""Code""#(lf) ,ENDDATE AS ""end_date""#(lf) from SL_OPERATIONS.SALE.REPORTS#(lf) where ENDDATE > \'2024-02-03\'", null, [EnableFolding=true]),\n #"selected Row" = Table.SelectRows(Source)\nin\n #"selected Row"', + 'let\n Source = Value.NativeQuery(DatabricksMultiCloud.Catalogs("foo.com", "/sql/1.0/warehouses/423423ew", [Catalog="sales_db", Database=null, EnableAutomaticProxyDiscovery=null]){[Name="sales_db",Kind="Database"]}[Data], "select * from public.slae_history#(lf)where creation_timestamp >= getDate(-3)", null, [EnableFolding=true]),\n #"NewTable" = Table.TransformColumn(Source,{{"creation_timestamp", type date}})\nin\n #"NewTable"', + 'let Source = Snowflake.Databases("example.snowflakecomputing.com","WAREHOUSE_NAME",[Role="CUSTOM_ROLE"]), DB_Source = Source{[Name="DATABASE_NAME",Kind="Database"]}[Data], SCHEMA_Source = DB_Source{[Name="SCHEMA_NAME",Kind="Schema"]}[Data], TABLE_Source = SCHEMA_Source{[Name="TABLE_NAME",Kind="View"]}[Data], #"Split Column by Time" = Table.SplitColumn(Table.TransformColumnTypes(TABLE_Source, {{"TIMESTAMP_COLUMN", type text}}, "en-GB"), "TIMESTAMP_COLUMN", Splitter.SplitTextByDelimiter(" ", QuoteStyle.Csv), {"TIMESTAMP_COLUMN.1", "TIMESTAMP_COLUMN.2"}), #"Added Custom" = Table.AddColumn(#"Split Column by Time", "SOB", each ([ENDTIME] - [STARTTIME]) * 60 * 60 * 24) in #"Added Custom"', + 'let\n Source = Sql.Database("AUPRDWHDB", "COMMOPSDB", [Query="DROP TABLE IF EXISTS #KKR;#(lf)Select#(lf)*,#(lf)concat((UPPER(REPLACE(SALES_SPECIALIST,\'-\',\'\'))),#(lf)LEFT(CAST(INVOICE_DATE AS DATE),4)+LEFT(RIGHT(CAST(INVOICE_DATE AS DATE),5),2)) AS AGENT_KEY,#(lf)CASE#(lf) WHEN CLASS = \'Software\' and (NOT(PRODUCT in (\'ADV\', \'Adv\') and left(ACCOUNT_ID,2)=\'10\') #(lf) or V_ENTERPRISE_INVOICED_REVENUE.TYPE = \'Manual Adjustment\') THEN INVOICE_AMOUNT#(lf) WHEN V_ENTERPRISE_INVOICED_REVENUE.TYPE IN (\'Recurring\',\'0\') THEN INVOICE_AMOUNT#(lf) ELSE 0#(lf)END as SOFTWARE_INV#(lf)#(lf)from V_ENTERPRISE_INVOICED_REVENUE", CommandTimeout=#duration(0, 1, 30, 0)]),\n #"Added Conditional Column" = Table.AddColumn(Source, "Services", each if [CLASS] = "Services" then [INVOICE_AMOUNT] else 0),\n #"Added Custom" = Table.AddColumn(#"Added Conditional Column", "Advanced New Sites", each if [PRODUCT] = "ADV"\nor [PRODUCT] = "Adv"\nthen [NEW_SITE]\nelse 0)\nin\n #"Added Custom"', + "LOAD_DATA(SOURCE)", ] +def get_data_platform_tables_with_dummy_table(q: str) -> List[resolver.Lineage]: + table: powerbi_data_classes.Table = powerbi_data_classes.Table( + columns=[], + measures=[], + expression=q, + name="virtual_order_table", + full_name="OrderDataSet.virtual_order_table", + ) + + reporter = PowerBiDashboardSourceReport() + + ctx, config, platform_instance_resolver = get_default_instances() + + config.enable_advance_lineage_sql_construct = True + + return parser.get_upstream_tables( + table, + reporter, + ctx=ctx, + config=config, + platform_instance_resolver=platform_instance_resolver, + ) + + def get_default_instances( override_config: dict = {}, ) -> Tuple[ @@ -681,6 +712,7 @@ def test_redshift_regular_case(): def test_redshift_native_query(): + table: powerbi_data_classes.Table = powerbi_data_classes.Table( expression=M_QUERIES[22], name="category", @@ -773,21 +805,13 @@ def test_sqlglot_parser(): def test_databricks_multi_cloud(): - table: powerbi_data_classes.Table = powerbi_data_classes.Table( - expression=M_QUERIES[25], - name="category", - full_name="dev.public.category", - ) - reporter = PowerBiDashboardSourceReport() + q = M_QUERIES[25] - ctx, config, platform_instance_resolver = get_default_instances() - data_platform_tables: List[DataPlatformTable] = parser.get_upstream_tables( - table, - reporter, - ctx=ctx, - config=config, - platform_instance_resolver=platform_instance_resolver, - )[0].upstreams + lineage: List[resolver.Lineage] = get_data_platform_tables_with_dummy_table(q=q) + + assert len(lineage) == 1 + + data_platform_tables = lineage[0].upstreams assert len(data_platform_tables) == 1 @@ -798,21 +822,13 @@ def test_databricks_multi_cloud(): def test_databricks_catalog_pattern_1(): - table: powerbi_data_classes.Table = powerbi_data_classes.Table( - expression=M_QUERIES[26], - name="category", - full_name="dev.public.category", - ) - reporter = PowerBiDashboardSourceReport() + q = M_QUERIES[26] - ctx, config, platform_instance_resolver = get_default_instances() - data_platform_tables: List[DataPlatformTable] = parser.get_upstream_tables( - table, - reporter, - ctx=ctx, - config=config, - platform_instance_resolver=platform_instance_resolver, - )[0].upstreams + lineage: List[resolver.Lineage] = get_data_platform_tables_with_dummy_table(q=q) + + assert len(lineage) == 1 + + data_platform_tables = lineage[0].upstreams assert len(data_platform_tables) == 1 @@ -936,6 +952,73 @@ def test_databricks_regular_case_with_view(): def test_snowflake_double_double_quotes(): q = M_QUERIES[30] + lineage: List[resolver.Lineage] = get_data_platform_tables_with_dummy_table(q=q) + + assert len(lineage) == 1 + + data_platform_tables = lineage[0].upstreams + + assert len(data_platform_tables) == 1 + + assert ( + data_platform_tables[0].urn + == "urn:li:dataset:(urn:li:dataPlatform:snowflake,sl_operations.sale.reports,PROD)" + ) + + +def test_databricks_multicloud(): + q = M_QUERIES[31] + + lineage: List[resolver.Lineage] = get_data_platform_tables_with_dummy_table(q=q) + + assert len(lineage) == 1 + + data_platform_tables = lineage[0].upstreams + + assert len(data_platform_tables) == 1 + + assert ( + data_platform_tables[0].urn + == "urn:li:dataset:(urn:li:dataPlatform:databricks,sales_db.public.slae_history,PROD)" + ) + + +def test_snowflake_multi_function_call(): + q = M_QUERIES[32] + + lineage: List[resolver.Lineage] = get_data_platform_tables_with_dummy_table(q=q) + + assert len(lineage) == 1 + + data_platform_tables = lineage[0].upstreams + + assert len(data_platform_tables) == 1 + + assert ( + data_platform_tables[0].urn + == "urn:li:dataset:(urn:li:dataPlatform:snowflake,database_name.schema_name.table_name,PROD)" + ) + + +def test_mssql_drop_with_select(): + q = M_QUERIES[33] + + lineage: List[resolver.Lineage] = get_data_platform_tables_with_dummy_table(q=q) + + assert len(lineage) == 1 + + data_platform_tables = lineage[0].upstreams + + assert len(data_platform_tables) == 1 + + assert ( + data_platform_tables[0].urn + == "urn:li:dataset:(urn:li:dataPlatform:mssql,commopsdb.dbo.v_enterprise_invoiced_revenue,PROD)" + ) + + +def test_unsupported_data_platform(): + q = M_QUERIES[34] table: powerbi_data_classes.Table = powerbi_data_classes.Table( columns=[], measures=[], @@ -948,16 +1031,119 @@ def test_snowflake_double_double_quotes(): ctx, config, platform_instance_resolver = get_default_instances() - data_platform_tables: List[DataPlatformTable] = parser.get_upstream_tables( + config.enable_advance_lineage_sql_construct = True + + assert ( + parser.get_upstream_tables( + table, + reporter, + ctx=ctx, + config=config, + platform_instance_resolver=platform_instance_resolver, + ) + == [] + ) + + info_entries: dict = reporter._structured_logs._entries.get( + StructuredLogLevel.INFO, {} + ) # type :ignore + + is_entry_present: bool = False + for key, entry in info_entries.items(): + if entry.title == "Non-Data Platform Expression": + is_entry_present = True + break + + assert ( + is_entry_present + ), 'Info message "Non-Data Platform Expression" should be present in reporter' + + +def test_empty_string_in_m_query(): + # TRIM(TRIM(TRIM(AGENT_NAME, '\"\"'), '+'), '\\'') is in Query + q = "let\n Source = Value.NativeQuery(Snowflake.Databases(\"bu10758.ap-unknown-2.fakecomputing.com\",\"operations_analytics_warehouse_prod\",[Role=\"OPERATIONS_ANALYTICS_MEMBER\"]){[Name=\"OPERATIONS_ANALYTICS\"]}[Data], \"select #(lf)UPPER(REPLACE(AGENT_NAME,'-','')) AS CLIENT_DIRECTOR,#(lf)TRIM(TRIM(TRIM(AGENT_NAME, '\"\"'), '+'), '\\'') AS TRIM_AGENT_NAME,#(lf)TIER,#(lf)UPPER(MANAGER),#(lf)TEAM_TYPE,#(lf)DATE_TARGET,#(lf)MONTHID,#(lf)TARGET_TEAM,#(lf)SELLER_EMAIL,#(lf)concat((UPPER(REPLACE(AGENT_NAME,'-',''))), MONTHID) as AGENT_KEY,#(lf)UNIT_TARGET AS SME_Quota,#(lf)AMV_TARGET AS Revenue_Quota,#(lf)SERVICE_QUOTA,#(lf)BL_TARGET,#(lf)SOFTWARE_QUOTA as Software_Quota#(lf)#(lf)from OPERATIONS_ANALYTICS.TRANSFORMED_PROD.V_SME_UNIT_TARGETS inner join OPERATIONS_ANALYTICS.TRANSFORMED_PROD.V_SME_UNIT #(lf)#(lf)where YEAR_TARGET >= 2022#(lf)and TEAM_TYPE = 'Accounting'#(lf)and TARGET_TEAM = 'Enterprise'#(lf)AND TIER = 'Client Director'\", null, [EnableFolding=true])\nin\n Source" + + lineage: List[resolver.Lineage] = get_data_platform_tables_with_dummy_table(q=q) + + assert len(lineage) == 1 + + data_platform_tables = lineage[0].upstreams + + assert len(data_platform_tables) == 2 + + assert ( + data_platform_tables[0].urn + == "urn:li:dataset:(urn:li:dataPlatform:snowflake,operations_analytics.transformed_prod.v_sme_unit,PROD)" + ) + assert ( + data_platform_tables[1].urn + == "urn:li:dataset:(urn:li:dataPlatform:snowflake,operations_analytics.transformed_prod.v_sme_unit_targets,PROD)" + ) + + +def test_double_quotes_in_alias(): + # SELECT CAST(sales_date AS DATE) AS \"\"Date\"\" in query + q = 'let \n Source = Sql.Database("abc.com", "DB", [Query="SELECT CAST(sales_date AS DATE) AS ""Date"",#(lf) SUM(cshintrpret) / 60.0 AS ""Total Order All Items"",#(lf)#(tab)#(tab)#(tab) SUM(cshintrpret) / 60.0 - LAG(SUM(cshintrpret) / 60.0, 1) OVER (ORDER BY CAST(sales_date AS DATE)) AS ""Total minute difference"",#(lf)#(tab)#(tab)#(tab) SUM(sale_price) / 60.0 - LAG(SUM(sale_price) / 60.0, 1) OVER (ORDER BY CAST(sales_date AS DATE)) AS ""Normal minute difference""#(lf) FROM [DB].[dbo].[sales_t]#(lf) WHERE sales_date >= GETDATE() - 365#(lf) GROUP BY CAST(sales_date AS DATE),#(lf)#(tab)#(tab)CAST(sales_date AS TIME);"]) \n in \n Source' + + lineage: List[resolver.Lineage] = get_data_platform_tables_with_dummy_table(q=q) + + assert len(lineage) == 1 + + data_platform_tables = lineage[0].upstreams + + assert len(data_platform_tables) == 1 + + assert ( + data_platform_tables[0].urn + == "urn:li:dataset:(urn:li:dataPlatform:mssql,db.dbo.sales_t,PROD)" + ) + + +@patch("datahub.ingestion.source.powerbi.m_query.parser.get_lark_parser") +def test_m_query_timeout(mock_get_lark_parser): + + q = 'let\n Source = Value.NativeQuery(Snowflake.Databases("0DD93C6BD5A6.snowflakecomputing.com","sales_analytics_warehouse_prod",[Role="sales_analytics_member_ad"]){[Name="SL_OPERATIONS"]}[Data], "select SALE_NO AS ""\x1b[4mSaleNo\x1b[0m""#(lf) ,CODE AS ""Code""#(lf) ,ENDDATE AS ""end_date""#(lf) from SL_OPERATIONS.SALE.REPORTS#(lf) where ENDDATE > \'2024-02-03\'", null, [EnableFolding=true]),\n #"selected Row" = Table.SelectRows(Source)\nin\n #"selected Row"' + + table: powerbi_data_classes.Table = powerbi_data_classes.Table( + columns=[], + measures=[], + expression=q, + name="virtual_order_table", + full_name="OrderDataSet.virtual_order_table", + ) + + reporter = PowerBiDashboardSourceReport() + + ctx, config, platform_instance_resolver = get_default_instances() + + config.enable_advance_lineage_sql_construct = True + + config.m_query_parse_timeout = 1 + + mock_lark_instance = MagicMock() + + mock_get_lark_parser.return_value = mock_lark_instance + # sleep for 5 seconds to trigger timeout + mock_lark_instance.parse.side_effect = lambda expression: time.sleep(5) + + parser.get_upstream_tables( table, reporter, ctx=ctx, config=config, platform_instance_resolver=platform_instance_resolver, - )[0].upstreams + ) + + warn_entries: dict = reporter._structured_logs._entries.get( + StructuredLogLevel.WARN, {} + ) # type :ignore + + is_entry_present: bool = False + for key, entry in warn_entries.items(): + if entry.title == "M-Query Parsing Timeout": + is_entry_present = True + break - assert len(data_platform_tables) == 1 assert ( - data_platform_tables[0].urn - == "urn:li:dataset:(urn:li:dataPlatform:snowflake,sl_operations.sale.reports,PROD)" - ) + is_entry_present + ), 'Warning message "M-Query Parsing Timeout" should be present in reporter' diff --git a/metadata-ingestion/tests/unit/test_powerbi_parser.py b/metadata-ingestion/tests/unit/test_powerbi_parser.py index e53e8d7aee16f..31579f0c0abd3 100644 --- a/metadata-ingestion/tests/unit/test_powerbi_parser.py +++ b/metadata-ingestion/tests/unit/test_powerbi_parser.py @@ -1,13 +1,17 @@ import pytest from datahub.ingestion.api.common import PipelineContext -from datahub.ingestion.source.powerbi.config import PowerBiDashboardSourceConfig +from datahub.ingestion.source.powerbi.config import ( + PowerBiDashboardSourceConfig, + PowerBiDashboardSourceReport, +) from datahub.ingestion.source.powerbi.dataplatform_instance_resolver import ( ResolvePlatformInstanceFromDatasetTypeMapping, ) from datahub.ingestion.source.powerbi.m_query.resolver import ( MSSqlDataPlatformTableCreator, ) +from datahub.ingestion.source.powerbi.rest_api_wrapper.data_classes import Table @pytest.fixture @@ -17,8 +21,16 @@ def creator(): client_id="test-client-id", client_secret="test-client-secret", ) + + table = Table( + name="test_table", + full_name="db.schema.test_table", + ) + return MSSqlDataPlatformTableCreator( ctx=PipelineContext(run_id="test-run-id"), + table=table, + reporter=PowerBiDashboardSourceReport(), config=config, platform_instance_resolver=ResolvePlatformInstanceFromDatasetTypeMapping( config